reorganising projects

--HG--
branch : sandbox
This commit is contained in:
Andreas
2009-10-24 15:01:03 +00:00
parent f859688759
commit 6e86a18cfe
11 changed files with 0 additions and 4153 deletions

View File

@@ -1,449 +0,0 @@
# Copyright (c) 2008 Andreas Balogh
# See LICENSE for details.
""" animated drawing of ticks
using self.canvas embedded in Tk application
Fibionacci retracements of 61.8, 50.0, 38.2, 23.6 % of min and max
"""
# system imports
import datetime
import os
import re
import logging
import sys
import warnings
import Tkinter as Tk
import numpy as np
import matplotlib
matplotlib.use('TkAgg')
from matplotlib.backends.backend_tkagg import FigureCanvasTkAgg, NavigationToolbar2TkAgg
import matplotlib.pyplot as plt
import matplotlib.dates as mdates
from matplotlib.dates import date2num
# local imports
# constants
ONE_MINUTE = 60. / 86400.
LOW, NONE, HIGH = range(-1, 2)
# globals
LOG = logging.getLogger()
logging.basicConfig(level=logging.DEBUG,
format='%(asctime)s.%(msecs)03i %(levelname).4s %(process)d:%(thread)d %(message)s',
datefmt='%H:%M:%S')
MDF_REO = re.compile("(..):(..):(..)\.*(\d+)*")
def tdl(tick_date):
""" returns a list of tick tuples (cdt, last) for specified day """
fiid = "846900"
year = tick_date.strftime("%Y")
yyyymmdd = tick_date.strftime("%Y%m%d")
filename = "%s.csv" % (fiid)
filepath = os.path.join("d:\\rttrd-prd-var\\consors-mdf\\data", year, yyyymmdd, filename)
x = [ ]
y = [ ]
fh = open(filepath, "r")
try:
prev_last = ""
for line in fh:
flds = line.split(",")
# determine file version
if flds[2] == "LAST":
last = float(flds[3])
vol = float(flds[4])
else:
last = float(flds[4])
vol = 0.0
# skip ticks with same last price
if prev_last == last:
continue
else:
prev_last = last
# parse time
mobj = MDF_REO.match(flds[0])
if mobj is None:
raise ValueError("no match for [%s]" % (flds[0],))
(hh, mm, ss, ms) = mobj.groups()
if ms:
c_time = datetime.time(int(hh), int(mm), int(ss), int(ms) * 1000)
else:
c_time = datetime.time(int(hh), int(mm), int(ss))
cdt = datetime.datetime.combine(tick_date, c_time)
x.append(date2num(cdt))
y.append(last)
finally:
fh.close()
# throw away first line of file (close price from previous day)
del x[0]
del y[0]
return (x, y)
class Lohi:
"""Time series online low and high detector."""
def __init__(self, bias):
assert(bias > 0)
self.bias = bias
self.low0 = None
self.high0 = None
self.prev_lohi = NONE
self.lohis = [ ]
self.lows = [ ]
self.highs = [ ]
def __call__(self, tick):
"""Add extended tick to the max min parser.
@param tick: The value of the current tick.
@type tick: tuple(cdt, last)
@return: 1. Tick if new max min has been detected,
2. None otherwise.
"""
n, cdt, last = tick
res = None
# automatic initialisation
if self.low0 is None:
self.low0 = tick
self.lows.append((n, cdt, last - 1))
if self.high0 is None:
self.high0 = tick
self.highs.append((n, cdt, last + 1))
if last > self.high0[2]:
self.high0 = tick
if self.prev_lohi == NONE:
if self.high0[2] > self.low0[2] + self.bias:
res = self.high0
self.low0 = self.high0
self.lows.append(self.high0)
self.lohis.append(self.high0)
self.prev_lohi = HIGH
if last < self.low0[2]:
self.low0 = tick
if self.prev_lohi == NONE:
if self.low0[2] < self.high0[2] - self.bias:
res = self.low0
self.high0 = self.low0
self.lows.append(self.low0)
self.lohis.append(self.low0)
self.prev_lohi = LOW
if self.high0[1] < cdt - ONE_MINUTE and \
((self.prev_lohi == LOW and \
self.high0[2] > self.lows[-1][2] + self.bias) or
(self.prev_lohi == HIGH and \
self.high0[2] > self.highs[-1][2])):
res = self.high0
self.low0 = self.high0
self.highs.append(self.high0)
self.lohis.append(self.high0)
self.prev_lohi = HIGH
if self.low0[1] < cdt - ONE_MINUTE and \
((self.prev_lohi == LOW and \
self.low0[2] < self.lows[-1][2]) or
(self.prev_lohi == HIGH and \
self.low0[2] < self.highs[-1][2] - self.bias)):
res = self.low0
self.high0 = self.low0
self.lows.append(self.low0)
self.lohis.append(self.low0)
self.prev_lohi = LOW
if res:
return (self.prev_lohi, res)
else:
return None
class Acp:
"""Always correct predictor"""
def __init__(self, lows, highs):
self.lows = lows
self.highs = highs
def __call__(self, tick):
"""Always correct predictor.
Requires previous run of DelayedAcp to determine lows and highs.
@param tick: The value of the current tick.
@type tick: tuple(n, cdt, last)
@return: 1. Tick if new max min has been detected,
2. None otherwise.
"""
n, cdt, last = tick
res = None
return res
def find_lows_highs(xs, ys):
dacp = DelayedAcp(10)
for tick in zip(range(len(xs)), xs, ys):
dacp(tick)
return dacp.lows, dacp.highs
class DelayedAcp:
"""Time series max & min detector."""
def __init__(self, bias):
assert(bias > 0)
self.bias = bias
self.trend = None
self.mm0 = None
self.lohis = [ ]
self.lows = [ ]
self.highs = [ ]
def __call__(self, tick):
"""Add extended tick to the max min parser.
@param tick: The value of the current tick.
@type tick: tuple(n, cdt, last)
@return: 1. Tick if new max min has been detected,
2. None otherwise.
"""
n, cdt, last = tick
res = None
# automatic initialisation
if self.mm0 is None:
# initalise water mark
self.mm0 = tick
res = self.mm0
self.lows = [(n, cdt, last - 1)]
self.highs = [(n, cdt, last + 1)]
else:
# initalise trend until price has changed
if self.trend is None or self.trend == 0:
self.trend = cmp(last, self.mm0[2])
# check for max
if self.trend > 0:
if last > self.mm0[2]:
self.mm0 = tick
if last < self.mm0[2] - self.bias:
self.lohis.append(self.mm0)
self.highs.append(self.mm0)
res = self.mm0
# revert trend & water mark
self.mm0 = tick
self.trend = -1
# check for min
if self.trend < 0:
if last < self.mm0[2]:
self.mm0 = tick
if last > self.mm0[2] + self.bias:
self.lohis.append(self.mm0)
self.lows.append(self.mm0)
res = self.mm0
# revert trend & water mark
self.mm0 = tick
self.trend = +1
return (cmp(self.trend, 0), res)
class Main:
def __init__(self):
warnings.simplefilter("default", np.RankWarning)
self.ylow = None
self.yhigh = None
self.advance_count = 1
self.root = Tk.Tk()
self.root.wm_title("Embedding in TK")
# create plot
fig = plt.figure()
self.ax1 = fig.add_subplot(211) # ticks
self.ax2 = fig.add_subplot(212) # diff from polyfit
# ax3 = fig.add_subplot(313) # cash
self.ax1.set_ylabel("ticks")
self.ax2.set_ylabel("polyfit diff")
# ax3.set_ylabel("cash")
major_fmt = mdates.DateFormatter('%H:%M:%S')
self.ax1.xaxis.set_major_formatter(major_fmt)
self.ax1.format_xdata = major_fmt
self.ax1.format_ydata = lambda x: '%1.2f' % x
self.ax1.grid(True)
self.ax2.xaxis.set_major_formatter(major_fmt)
self.ax2.format_xdata = major_fmt
self.ax2.format_ydata = lambda x: '%1.2f' % x
self.ax2.grid(True)
# rotates and right aligns the x labels, and moves the bottom of the
# axes up to make room for them
fig.autofmt_xdate()
# create artists
LOG.debug("Loading ticks...")
self.x, self.y = tdl(datetime.datetime(2009, 6, 3))
LOG.debug("Ticks loaded.")
lows, highs = find_lows_highs(self.x, self.y)
self.mmh = Lohi(10)
self.w0 = 0
self.wd = 1000
self.low_high_crs = 0
xr, yr = self.tick_window(self.w0, self.wd)
fit = np.average(yr)
self.tl, = self.ax1.plot_date(xr, yr, '-')
self.fl, = self.ax1.plot_date(xr, (fit,) * len(xr), 'k--')
self.mh, = self.ax1.plot_date(xr, (yr[0],) * len(xr), 'g:')
self.ml, = self.ax1.plot_date(xr, (yr[0],) * len(xr), 'r:')
self.him, = self.ax1.plot_date([x for n, x, y in lows], [y for n, x, y in lows], 'go')
self.lom, = self.ax1.plot_date([x for n, x, y in highs], [y for n, x, y in highs], 'ro')
self.dl, = self.ax2.plot_date(xr, (0,) * len(xr), '-')
self.set_axis(xr, yr)
# embed canvas in Tk
self.canvas = FigureCanvasTkAgg(fig, master=self.root)
self.canvas.draw()
self.canvas.get_tk_widget().pack(side=Tk.TOP, fill=Tk.BOTH, expand=Tk.TRUE)
# toolbar = NavigationToolbar2TkAgg( self.canvas, self.root )
# toolbar.update()
# self.canvas._tkself.canvas.pack(side=Tk.TOP, fill=Tk.BOTH, expand=1)
fr1 = Tk.Frame(master=self.root)
bu1 = Tk.Button(master=fr1, text='Quit', command=self.root.quit)
bu2 = Tk.Button(master=fr1, text='Stop', command=self.stop)
bu3 = Tk.Button(master=fr1, text='Resume', command=self.resume)
bu4 = Tk.Button(master=fr1, text='1x', command=self.times_one)
bu5 = Tk.Button(master=fr1, text='5x', command=self.times_five)
bu6 = Tk.Button(master=fr1, text='10x', command=self.times_ten)
bu1.pack(side=Tk.RIGHT, padx=5, pady=5)
bu6.pack(side=Tk.RIGHT, padx=5, pady=5)
bu5.pack(side=Tk.RIGHT, padx=5, pady=5)
bu4.pack(side=Tk.RIGHT, padx=5, pady=5)
bu2.pack(side=Tk.RIGHT, padx=5, pady=5)
bu3.pack(side=Tk.RIGHT, padx=5, pady=5)
fr1.pack(side=Tk.BOTTOM)
def animate(self):
self.w0 += self.advance_count
# prepare timeline window
xr, yr = self.tick_window(self.w0, self.wd)
while self.low_high_crs < self.w0 + self.wd:
self.mark_low_high(self.low_high_crs)
self.low_high_crs += 1
# build polynomial fit
mms = self.mmh.lohis
if len(mms) > 1:
mx = [ (x - int(x)) * 86400 for x in xr ]
my = yr
polyval = np.polyfit(mx, my, 10)
fit = np.polyval(polyval, mx)
self.fl.set_data(xr, fit)
# calc diff
self.dl.set_data(xr, yr - fit)
maxs = self.mmh.highs
if len(maxs) > 1:
n, x1, y1 = maxs[-2]
n, x2, y2 = maxs[-1]
x3 = xr[-1]
polyfit = np.polyfit((x1, x2), (y1, y2), 1)
y3 = np.polyval(polyfit, x3)
self.mh.set_data((x1, x2, x3), (y1, y2, y3))
mins = self.mmh.lows
if len(mins) > 1:
n, x1, y1 = mins[-2]
n, x2, y2 = mins[-1]
x3 = xr[-1]
polyfit = np.polyfit((x1, x2), (y1, y2), 1)
y3 = np.polyval(polyfit, x3)
self.ml.set_data((x1, x2, x3), (y1, y2, y3))
# update tick line
self.tl.set_data(xr, yr)
# update axis
self.set_axis(xr, yr)
self.canvas.draw()
if self.w0 < len(self.x) - self.wd - 1:
self.after_id = self.root.after(10, self.animate)
def set_axis(self, xr, yr, bias=50):
if self.ylow is None:
self.ylow = yr[0] - bias / 2
self.yhigh = yr[0] + bias / 2
for y in yr:
if y < self.ylow:
self.ylow = y
self.yhigh = self.ylow + bias
if y > self.yhigh:
self.yhigh = y
self.ylow = self.yhigh - bias
self.ax1.axis([xr[0], xr[-1], self.ylow, self.yhigh])
self.ax2.axis([xr[0], xr[-1], -25, +25])
def tick_window(self, w0, wd=1000):
return (self.x[w0:w0 + wd], self.y[w0:w0 + wd])
def mark_low_high(self, n):
x = self.x
y = self.y
rc = self.mmh((n, x[n], y[n]))
if rc:
lohi, tick = rc
nlh, xlh, ylh = tick
if lohi < 0:
# low
self.ax1.annotate('low',
xy=(x[nlh], y[nlh]),
xytext=(x[n], y[nlh]),
arrowprops=dict(facecolor='red',
frac=0.3,
shrink=0.1))
elif lohi > 0:
# high
self.ax1.annotate('high',
xy=(x[nlh], y[nlh]),
xytext=(x[n], y[nlh]),
arrowprops=dict(facecolor='green',
frac=0.3,
shrink=0.1))
def stop(self):
if self.after_id:
self.root.after_cancel(self.after_id)
self.after_id = None
def resume(self):
if self.after_id is None:
self.after_id = self.root.after(10, self.animate)
def times_one(self):
self.advance_count = 1
def times_five(self):
self.advance_count = 5
def times_ten(self):
self.advance_count = 10
def run(self):
self.root.after(500, self.animate)
self.root.mainloop()
self.root.destroy()
if __name__ == "__main__":
app = Main()
app.run()

View File

@@ -1,422 +0,0 @@
# Copyright (c) 2008 Andreas Balogh
# See LICENSE for details.
""" animated drawing of ticks
using self.canvas embedded in Tk application
Fibionacci retracements of 61.8, 50.0, 38.2, 23.6 % of min and max
"""
# system imports
import datetime
import os
import re
import logging
import warnings
import math
import Tkinter as Tk
import numpy as np
import matplotlib
matplotlib.use('TkAgg')
from matplotlib.backends.backend_tkagg import FigureCanvasTkAgg
import matplotlib.pyplot as plt
import matplotlib.dates as mdates
from matplotlib.dates import date2num
# local imports
# constants
ONE_MINUTE = 60. / 86400.
LOW, NONE, HIGH = range(-1, 2)
# globals
LOG = logging.getLogger()
logging.basicConfig(level=logging.DEBUG,
format='%(asctime)s.%(msecs)03i %(levelname).4s %(process)d:%(thread)d %(message)s',
datefmt='%H:%M:%S')
MDF_REO = re.compile("(..):(..):(..)\.*(\d+)*")
def tdl(tick_date):
""" returns a list of tick tuples (cdt, last) for specified day """
fiid = "846900"
year = tick_date.strftime("%Y")
yyyymmdd = tick_date.strftime("%Y%m%d")
filename = "%s.csv" % (fiid)
filepath = os.path.join("c:\\rttrd-prd-var\\consors-mdf\\data", year, yyyymmdd, filename)
x = [ ]
y = [ ]
fh = open(filepath, "r")
try:
prev_last = ""
for line in fh:
flds = line.split(",")
# determine file version
if flds[2] == "LAST":
last = float(flds[3])
vol = float(flds[4])
else:
last = float(flds[4])
vol = 0.0
# skip ticks with same last price
if prev_last == last:
continue
else:
prev_last = last
# parse time
mobj = MDF_REO.match(flds[0])
if mobj is None:
raise ValueError("no match for [%s]" % (flds[0],))
(hh, mm, ss, ms) = mobj.groups()
if ms:
c_time = datetime.time(int(hh), int(mm), int(ss), int(ms) * 1000)
else:
c_time = datetime.time(int(hh), int(mm), int(ss))
cdt = datetime.datetime.combine(tick_date, c_time)
x.append(date2num(cdt))
y.append(last)
finally:
fh.close()
# throw away first line of file (close price from previous day)
del x[0]
del y[0]
return (x, y)
def num2sod(x):
frac, integ = math.modf(x)
return frac * 86400
class Lohi:
"""Time series online low and high detector."""
def __init__(self, bias):
assert(bias > 0)
self.bias = bias
self.low0 = None
self.high0 = None
self.prev_lohi = NONE
self.lohis = [ ]
self.lows = [ ]
self.highs = [ ]
def __call__(self, tick):
"""Add extended tick to the max min parser.
@param tick: The value of the current tick.
@type tick: tuple(cdt, last)
@return: 1. Tick if new max min has been detected,
2. None otherwise.
"""
n, cdt, last = tick
res = None
# automatic initialisation
if self.low0 is None:
self.low0 = tick
self.lows.append((n, cdt, last - 1))
if self.high0 is None:
self.high0 = tick
self.highs.append((n, cdt, last + 1))
if last > self.high0[2]:
self.high0 = tick
if self.prev_lohi == NONE:
if self.high0[2] > self.low0[2] + self.bias:
res = self.high0
self.low0 = self.high0
self.lows.append(self.high0)
self.lohis.append(self.high0)
self.prev_lohi = HIGH
if last < self.low0[2]:
self.low0 = tick
if self.prev_lohi == NONE:
if self.low0[2] < self.high0[2] - self.bias:
res = self.low0
self.high0 = self.low0
self.lows.append(self.low0)
self.lohis.append(self.low0)
self.prev_lohi = LOW
if self.high0[1] < cdt - ONE_MINUTE and \
((self.prev_lohi == LOW and \
self.high0[2] > self.lows[-1][2] + self.bias) or
(self.prev_lohi == HIGH and \
self.high0[2] > self.highs[-1][2])):
res = self.high0
self.low0 = self.high0
self.highs.append(self.high0)
self.lohis.append(self.high0)
self.prev_lohi = HIGH
if self.low0[1] < cdt - ONE_MINUTE and \
((self.prev_lohi == LOW and \
self.low0[2] < self.lows[-1][2]) or
(self.prev_lohi == HIGH and \
self.low0[2] < self.highs[-1][2] - self.bias)):
res = self.low0
self.high0 = self.low0
self.lows.append(self.low0)
self.lohis.append(self.low0)
self.prev_lohi = LOW
if res:
return (self.prev_lohi, res)
else:
return None
def find_lows_highs(xs, ys):
dacp = DelayedAcp(10)
for tick in zip(range(len(xs)), xs, ys):
dacp(tick)
return dacp.lows, dacp.highs
class DelayedAcp:
"""Time series max & min detector."""
def __init__(self, bias):
assert(bias > 0)
self.bias = bias
self.trend = None
self.mm0 = None
self.lohis = [ ]
self.lows = [ ]
self.highs = [ ]
def __call__(self, tick):
"""Add extended tick to the max min parser.
@param tick: The value of the current tick.
@type tick: tuple(n, cdt, last)
@return: 1. Tick if new max min has been detected,
2. None otherwise.
"""
n, cdt, last = tick
res = None
# automatic initialisation
if self.mm0 is None:
# initialise water mark
self.mm0 = tick
res = self.mm0
self.lows = [(n, cdt, last - 1)]
self.highs = [(n, cdt, last + 1)]
else:
# initialise trend until price has changed
if self.trend is None or self.trend == 0:
self.trend = cmp(last, self.mm0[2])
# check for max
if self.trend > 0:
if last > self.mm0[2]:
self.mm0 = tick
if last < self.mm0[2] - self.bias:
self.lohis.append(self.mm0)
self.highs.append(self.mm0)
res = self.mm0
# revert trend & water mark
self.mm0 = tick
self.trend = -1
# check for min
if self.trend < 0:
if last < self.mm0[2]:
self.mm0 = tick
if last > self.mm0[2] + self.bias:
self.lohis.append(self.mm0)
self.lows.append(self.mm0)
res = self.mm0
# revert trend & water mark
self.mm0 = tick
self.trend = +1
return (cmp(self.trend, 0), res)
class Main:
def __init__(self):
warnings.simplefilter("default", np.RankWarning)
self.ylow = None
self.yhigh = None
self.advance_count = 1
self.root = Tk.Tk()
self.root.wm_title("Embedding in TK")
# create plot
fig = plt.figure()
self.ax1 = fig.add_subplot(211) # ticks
self.ax2 = fig.add_subplot(212) # diff from polyfit
# ax3 = fig.add_subplot(313) # cash
self.ax1.set_ylabel("ticks")
self.ax2.set_ylabel("polyfit diff")
# ax3.set_ylabel("cash")
major_fmt = mdates.DateFormatter('%H:%M:%S')
self.ax1.xaxis.set_major_formatter(major_fmt)
self.ax1.format_xdata = major_fmt
self.ax1.format_ydata = lambda x: '%1.2f' % x
self.ax1.grid(True)
self.ax2.xaxis.set_major_formatter(major_fmt)
self.ax2.format_xdata = major_fmt
self.ax2.format_ydata = lambda x: '%1.2f' % x
self.ax2.grid(True)
# rotates and right aligns the x labels, and moves the bottom of the
# axes up to make room for them
fig.autofmt_xdate()
# create artists
LOG.debug("Loading ticks...")
self.x, self.y = tdl(datetime.datetime(2009, 6, 3))
LOG.debug("Ticks loaded.")
lows, highs = find_lows_highs(self.x, self.y)
self.mmh = Lohi(10)
self.w0 = 0
self.wd = 1000
self.low_high_crs = 0
xr, yr = self.tick_window(self.w0, self.wd)
fit = np.average(yr)
self.tl, = self.ax1.plot_date(xr, yr, '-')
self.fl, = self.ax1.plot_date(xr, (fit,) * len(xr), 'k--')
self.mh, = self.ax1.plot_date(xr, (yr[0],) * len(xr), 'g:')
self.ml, = self.ax1.plot_date(xr, (yr[0],) * len(xr), 'r:')
self.him, = self.ax1.plot_date([x for n, x, y in lows], [y for n, x, y in lows], 'go')
self.lom, = self.ax1.plot_date([x for n, x, y in highs], [y for n, x, y in highs], 'ro')
self.dl, = self.ax2.plot_date(xr, (0,) * len(xr), '-')
self.set_axis(xr, yr)
# embed canvas in Tk
self.canvas = FigureCanvasTkAgg(fig, master=self.root)
self.canvas.draw()
self.canvas.get_tk_widget().pack(side=Tk.TOP, fill=Tk.BOTH, expand=Tk.TRUE)
# toolbar = NavigationToolbar2TkAgg( self.canvas, self.root )
# toolbar.update()
# self.canvas._tkself.canvas.pack(side=Tk.TOP, fill=Tk.BOTH, expand=1)
fr1 = Tk.Frame(master=self.root)
bu1 = Tk.Button(master=fr1, text='Quit', command=self.root.quit)
bu2 = Tk.Button(master=fr1, text='Stop', command=self.stop)
bu3 = Tk.Button(master=fr1, text='Resume', command=self.resume)
bu4 = Tk.Button(master=fr1, text='1x', command=self.times_one)
bu5 = Tk.Button(master=fr1, text='5x', command=self.times_five)
bu6 = Tk.Button(master=fr1, text='10x', command=self.times_ten)
bu1.pack(side=Tk.RIGHT, padx=5, pady=5)
bu6.pack(side=Tk.RIGHT, padx=5, pady=5)
bu5.pack(side=Tk.RIGHT, padx=5, pady=5)
bu4.pack(side=Tk.RIGHT, padx=5, pady=5)
bu2.pack(side=Tk.RIGHT, padx=5, pady=5)
bu3.pack(side=Tk.RIGHT, padx=5, pady=5)
fr1.pack(side=Tk.BOTTOM)
def animate(self):
self.w0 += self.advance_count
# prepare timeline window
xr, yr = self.tick_window(self.w0, self.wd)
while self.low_high_crs < self.w0 + self.wd:
self.mark_low_high(self.low_high_crs)
self.low_high_crs += 1
# build polynomial fit
mms = self.mmh.lohis
if len(mms) > 1:
mx = [ num2sod(x) for x in xr ]
my = yr
polyval = np.polyfit(mx, my, 10)
fit = np.polyval(polyval, mx)
self.fl.set_data(xr, fit)
# calc diff
self.dl.set_data(xr, yr - fit)
highs = self.mmh.highs
if len(highs) >= 4:
coefs = np.polyfit([num2sod(x) for n, x, y in highs[-4:]], [y for n, x, y in highs[-4:]], 3)
self.mh.set_data(xr, [np.polyval(coefs, num2sod(x)) for x in xr])
lows = self.mmh.lows
if len(lows) >= 4:
coefs = np.polyfit([num2sod(x) for n, x, y in lows[-4:]], [y for n, x, y in lows[-4:]], 3)
self.ml.set_data(xr, [np.polyval(coefs, num2sod(x)) for x in xr])
# update tick line
self.tl.set_data(xr, yr)
# update axis
self.set_axis(xr, yr)
self.canvas.draw()
if self.w0 < len(self.x) - self.wd - 1:
self.after_id = self.root.after(10, self.animate)
def set_axis(self, xr, yr, bias=50):
if self.ylow is None:
self.ylow = yr[0] - bias / 2
self.yhigh = yr[0] + bias / 2
for y in yr:
if y < self.ylow:
self.ylow = y
self.yhigh = self.ylow + bias
if y > self.yhigh:
self.yhigh = y
self.ylow = self.yhigh - bias
self.ax1.axis([xr[0], xr[-1], self.ylow, self.yhigh])
self.ax2.axis([xr[0], xr[-1], -25, +25])
def tick_window(self, w0, wd = 1000):
return (self.x[w0:w0 + wd], self.y[w0:w0 + wd])
def mark_low_high(self, n):
x = self.x
y = self.y
rc = self.mmh((n, x[n], y[n]))
if rc:
lohi, tick = rc
nlh, xlh, ylh = tick
if lohi < 0:
# low
self.ax1.annotate('low',
xy=(x[nlh], y[nlh]),
xytext=(x[n], y[nlh]),
arrowprops=dict(facecolor='red',
frac=0.3,
shrink=0.1))
elif lohi > 0:
# high
self.ax1.annotate('high',
xy=(x[nlh], y[nlh]),
xytext=(x[n], y[nlh]),
arrowprops=dict(facecolor='green',
frac=0.3,
shrink=0.1))
def stop(self):
if self.after_id:
self.root.after_cancel(self.after_id)
self.after_id = None
def resume(self):
if self.after_id is None:
self.after_id = self.root.after(10, self.animate)
def times_one(self):
self.advance_count = 1
def times_five(self):
self.advance_count = 5
def times_ten(self):
self.advance_count = 10
def run(self):
self.root.after(500, self.animate)
self.root.mainloop()
self.root.destroy()
if __name__ == "__main__":
app = Main()
app.run()

View File

@@ -1,420 +0,0 @@
# Copyright (c) 2008 Andreas Balogh
# See LICENSE for details.
""" animated drawing of ticks
using self.canvas embedded in Tk application
Fibionacci retracements of 61.8, 50.0, 38.2, 23.6 % of min and max
"""
# system imports
import datetime
import os
import re
import logging
import warnings
import math
import Tkinter as Tk
import numpy as np
import matplotlib
matplotlib.use('TkAgg')
from matplotlib.backends.backend_tkagg import FigureCanvasTkAgg
import matplotlib.pyplot as plt
import matplotlib.dates as mdates
from matplotlib.dates import date2num
# local imports
# constants
ONE_MINUTE = 60. / 86400.
LOW, NONE, HIGH = range(-1, 2)
# globals
LOG = logging.getLogger()
logging.basicConfig(level=logging.DEBUG,
format='%(asctime)s.%(msecs)03i %(levelname).4s %(process)d:%(thread)d %(message)s',
datefmt='%H:%M:%S')
MDF_REO = re.compile("(..):(..):(..)\.*(\d+)*")
def tdl(tick_date):
""" returns a list of tick tuples (cdt, last) for specified day """
fiid = "846900"
year = tick_date.strftime("%Y")
yyyymmdd = tick_date.strftime("%Y%m%d")
filename = "%s.csv" % (fiid)
filepath = os.path.join("c:\\rttrd-prd-var\\consors-mdf\\data", year, yyyymmdd, filename)
x = [ ]
y = [ ]
fh = open(filepath, "r")
try:
prev_last = ""
for line in fh:
flds = line.split(",")
# determine file version
if flds[2] == "LAST":
last = float(flds[3])
vol = float(flds[4])
else:
last = float(flds[4])
vol = 0.0
# skip ticks with same last price
if prev_last == last:
continue
else:
prev_last = last
# parse time
mobj = MDF_REO.match(flds[0])
if mobj is None:
raise ValueError("no match for [%s]" % (flds[0],))
(hh, mm, ss, ms) = mobj.groups()
if ms:
c_time = datetime.time(int(hh), int(mm), int(ss), int(ms) * 1000)
else:
c_time = datetime.time(int(hh), int(mm), int(ss))
cdt = datetime.datetime.combine(tick_date, c_time)
x.append(date2num(cdt))
y.append(last)
finally:
fh.close()
# throw away first line of file (close price from previous day)
del x[0]
del y[0]
return (x, y)
def num2sod(x):
frac, integ = math.modf(x)
return frac * 86400
class Lohi:
"""Time series online low and high detector."""
def __init__(self, bias):
assert(bias > 0)
self.bias = bias
self.low0 = None
self.high0 = None
self.prev_lohi = NONE
self.lohis = [ ]
self.lows = [ ]
self.highs = [ ]
def __call__(self, tick):
"""Add extended tick to the max min parser.
@param tick: The value of the current tick.
@type tick: tuple(cdt, last)
@return: 1. Tick if new max min has been detected,
2. None otherwise.
"""
n, cdt, last = tick
res = None
# automatic initialisation
if self.low0 is None:
self.low0 = tick
self.lows.append((n, cdt, last - 1))
if self.high0 is None:
self.high0 = tick
self.highs.append((n, cdt, last + 1))
if last > self.high0[2]:
self.high0 = tick
if self.prev_lohi == NONE:
if self.high0[2] > self.low0[2] + self.bias:
res = self.high0
self.low0 = self.high0
self.lows.append(self.high0)
self.lohis.append(self.high0)
self.prev_lohi = HIGH
if last < self.low0[2]:
self.low0 = tick
if self.prev_lohi == NONE:
if self.low0[2] < self.high0[2] - self.bias:
res = self.low0
self.high0 = self.low0
self.lows.append(self.low0)
self.lohis.append(self.low0)
self.prev_lohi = LOW
if self.high0[1] < cdt - ONE_MINUTE and \
((self.prev_lohi == LOW and \
self.high0[2] > self.lows[-1][2] + self.bias) or
(self.prev_lohi == HIGH and \
self.high0[2] > self.highs[-1][2])):
res = self.high0
self.low0 = self.high0
self.highs.append(self.high0)
self.lohis.append(self.high0)
self.prev_lohi = HIGH
if self.low0[1] < cdt - ONE_MINUTE and \
((self.prev_lohi == LOW and \
self.low0[2] < self.lows[-1][2]) or
(self.prev_lohi == HIGH and \
self.low0[2] < self.highs[-1][2] - self.bias)):
res = self.low0
self.high0 = self.low0
self.lows.append(self.low0)
self.lohis.append(self.low0)
self.prev_lohi = LOW
if res:
return (self.prev_lohi, res)
else:
return None
def find_lows_highs(xs, ys):
dacp = DelayedAcp(10)
for tick in zip(range(len(xs)), xs, ys):
dacp(tick)
return dacp.lows, dacp.highs
class DelayedAcp:
"""Time series max & min detector."""
def __init__(self, bias):
assert(bias > 0)
self.bias = bias
self.trend = None
self.mm0 = None
self.lohis = [ ]
self.lows = [ ]
self.highs = [ ]
def __call__(self, tick):
"""Add extended tick to the max min parser.
@param tick: The value of the current tick.
@type tick: tuple(n, cdt, last)
@return: 1. Tick if new max min has been detected,
2. None otherwise.
"""
n, cdt, last = tick
res = None
# automatic initialisation
if self.mm0 is None:
# initialise water mark
self.mm0 = tick
res = self.mm0
self.lows = [(n, cdt, last - 1)]
self.highs = [(n, cdt, last + 1)]
else:
# initialise trend until price has changed
if self.trend is None or self.trend == 0:
self.trend = cmp(last, self.mm0[2])
# check for max
if self.trend > 0:
if last > self.mm0[2]:
self.mm0 = tick
if last < self.mm0[2] - self.bias:
self.lohis.append(self.mm0)
self.highs.append(self.mm0)
res = self.mm0
# revert trend & water mark
self.mm0 = tick
self.trend = -1
# check for min
if self.trend < 0:
if last < self.mm0[2]:
self.mm0 = tick
if last > self.mm0[2] + self.bias:
self.lohis.append(self.mm0)
self.lows.append(self.mm0)
res = self.mm0
# revert trend & water mark
self.mm0 = tick
self.trend = +1
return (cmp(self.trend, 0), res)
class Main:
def __init__(self):
warnings.simplefilter("default", np.RankWarning)
self.ylow = None
self.yhigh = None
self.advance_count = 1
self.root = Tk.Tk()
self.root.wm_title("Embedding in TK")
# create plot
fig = plt.figure()
self.ax1 = fig.add_subplot(211) # ticks
self.ax2 = fig.add_subplot(212) # diff from polyfit
# ax3 = fig.add_subplot(313) # cash
self.ax1.set_ylabel("ticks")
self.ax2.set_ylabel("polyfit diff")
# ax3.set_ylabel("cash")
major_fmt = mdates.DateFormatter('%H:%M:%S')
self.ax1.xaxis.set_major_formatter(major_fmt)
self.ax1.format_xdata = major_fmt
self.ax1.format_ydata = lambda x: '%1.2f' % x
self.ax1.grid(True)
self.ax2.xaxis.set_major_formatter(major_fmt)
self.ax2.format_xdata = major_fmt
self.ax2.format_ydata = lambda x: '%1.2f' % x
self.ax2.grid(True)
# rotates and right aligns the x labels, and moves the bottom of the
# axes up to make room for them
fig.autofmt_xdate()
# create artists
LOG.debug("Loading ticks...")
self.x, self.y = tdl(datetime.datetime(2009, 6, 3))
LOG.debug("Ticks loaded.")
lows, highs = find_lows_highs(self.x, self.y)
self.mmh = Lohi(10)
self.w0 = 0
self.wd = 1000
self.low_high_crs = 0
xr, yr = self.tick_window(self.w0, self.wd)
fit = np.average(yr)
self.tl, = self.ax1.plot_date(xr, yr, '-')
self.fl, = self.ax1.plot_date(xr, (fit,) * len(xr), 'k--')
self.mh, = self.ax1.plot_date(xr, (yr[0],) * len(xr), 'g:')
self.ml, = self.ax1.plot_date(xr, (yr[0],) * len(xr), 'r:')
# Acp markers
self.him, = self.ax1.plot_date([x for n, x, y in lows], [y for n, x, y in lows], 'go')
self.lom, = self.ax1.plot_date([x for n, x, y in highs], [y for n, x, y in highs], 'ro')
self.dl, = self.ax2.plot_date(xr, (0,) * len(xr), '-')
self.set_axis(xr, yr)
# embed canvas in Tk
self.canvas = FigureCanvasTkAgg(fig, master=self.root)
self.canvas.draw()
self.canvas.get_tk_widget().pack(side=Tk.TOP, fill=Tk.BOTH, expand=Tk.TRUE)
# toolbar = NavigationToolbar2TkAgg( self.canvas, self.root )
# toolbar.update()
# self.canvas._tkself.canvas.pack(side=Tk.TOP, fill=Tk.BOTH, expand=1)
fr1 = Tk.Frame(master=self.root)
bu1 = Tk.Button(master=fr1, text='Quit', command=self.root.quit)
bu2 = Tk.Button(master=fr1, text='Stop', command=self.stop)
bu3 = Tk.Button(master=fr1, text='Resume', command=self.resume)
bu4 = Tk.Button(master=fr1, text='1x', command=self.times_one)
bu5 = Tk.Button(master=fr1, text='5x', command=self.times_five)
bu6 = Tk.Button(master=fr1, text='10x', command=self.times_ten)
bu1.pack(side=Tk.RIGHT, padx=5, pady=5)
bu6.pack(side=Tk.RIGHT, padx=5, pady=5)
bu5.pack(side=Tk.RIGHT, padx=5, pady=5)
bu4.pack(side=Tk.RIGHT, padx=5, pady=5)
bu2.pack(side=Tk.RIGHT, padx=5, pady=5)
bu3.pack(side=Tk.RIGHT, padx=5, pady=5)
fr1.pack(side=Tk.BOTTOM)
def animate(self):
self.w0 += self.advance_count
# prepare timeline window
xr, yr = self.tick_window(self.w0, self.wd)
while self.low_high_crs < self.w0 + self.wd:
self.mark_low_high(self.low_high_crs)
self.low_high_crs += 1
# build polynomial fit
lohis = self.mmh.lohis
if len(lohis) >= 4:
n0, x0, y0 = lohis[-4]
n1, x1, y1 = lohis[-1]
x2 = xr[-1]
coefs = np.polyfit([num2sod(x) for n, x, y in lohis[-4:]], [y for n, x, y in lohis[-4:]], 1)
self.fl.set_data((x0, x2), [np.polyval(coefs, num2sod(x)) for x in (x0, x2)])
# width of trend channel
mx = 0
for n in range(n0, n1):
mx = max(mx, math.fabs(np.polyval(coefs, num2sod(self.x[n])) - self.y[n]))
a, b = coefs
self.mh.set_data((x0, x2), [np.polyval((a, b+mx), num2sod(x)) for x in (x0, x2)])
self.ml.set_data((x0, x2), [np.polyval((a, b-mx), num2sod(x)) for x in (x0, x2)])
# update tick line
self.tl.set_data(xr, yr)
# update axis
self.set_axis(xr, yr)
self.canvas.draw()
if self.w0 < len(self.x) - self.wd - 1:
self.after_id = self.root.after(10, self.animate)
def set_axis(self, xr, yr, bias=50):
if self.ylow is None:
self.ylow = yr[0] - bias / 2
self.yhigh = yr[0] + bias / 2
for y in yr:
if y < self.ylow:
self.ylow = y
self.yhigh = self.ylow + bias
if y > self.yhigh:
self.yhigh = y
self.ylow = self.yhigh - bias
self.ax1.axis([xr[0], xr[-1], self.ylow, self.yhigh])
self.ax2.axis([xr[0], xr[-1], -25, +25])
def tick_window(self, w0, wd = 1000):
return (self.x[w0:w0 + wd], self.y[w0:w0 + wd])
def mark_low_high(self, n):
x = self.x
y = self.y
rc = self.mmh((n, x[n], y[n]))
if rc:
lohi, tick = rc
nlh, xlh, ylh = tick
if lohi < 0:
# low
self.ax1.annotate('low',
xy=(x[nlh], y[nlh]),
xytext=(x[n], y[nlh]),
arrowprops=dict(facecolor='red',
frac=0.3,
shrink=0.1))
elif lohi > 0:
# high
self.ax1.annotate('high',
xy=(x[nlh], y[nlh]),
xytext=(x[n], y[nlh]),
arrowprops=dict(facecolor='green',
frac=0.3,
shrink=0.1))
def stop(self):
if self.after_id:
self.root.after_cancel(self.after_id)
self.after_id = None
def resume(self):
if self.after_id is None:
self.after_id = self.root.after(10, self.animate)
def times_one(self):
self.advance_count = 1
def times_five(self):
self.advance_count = 5
def times_ten(self):
self.advance_count = 10
def run(self):
self.root.after(500, self.animate)
self.root.mainloop()
self.root.destroy()
if __name__ == "__main__":
app = Main()
app.run()

View File

@@ -1,480 +0,0 @@
# Copyright (c) 2008 Andreas Balogh
# See LICENSE for details.
""" animated drawing of ticks
using self.canvas embedded in Tk application
Fibionacci retracements of 61.8, 50.0, 38.2, 23.6 % of min and max
"""
# system imports
import datetime
import os
import re
import logging
import warnings
import math
import Tkinter as Tk
import numpy as np
import matplotlib
matplotlib.use('TkAgg')
from matplotlib.backends.backend_tkagg import FigureCanvasTkAgg
import matplotlib.pyplot as plt
import matplotlib.dates as mdates
from matplotlib.dates import date2num
# local imports
from globals import *
# constants
ONE_MINUTE = 60. / 86400.
LOW, NONE, HIGH = range(-1, 2)
# globals
LOG = logging.getLogger()
logging.basicConfig(level=logging.DEBUG,
format='%(asctime)s.%(msecs)03i %(levelname).4s %(process)d:%(thread)d %(message)s',
datefmt='%H:%M:%S')
MDF_REO = re.compile("(..):(..):(..)\.*(\d+)*")
def tdl(tick_date):
""" returns a list of tick tuples (cdt, last) for specified day """
fiid = "846900"
year = tick_date.strftime("%Y")
yyyymmdd = tick_date.strftime("%Y%m%d")
filename = "%s.csv" % (fiid)
filepath = os.path.join(RTTRD_VAR, "consors-mdf\\data", year, yyyymmdd, filename)
x = [ ]
y = [ ]
v = [ ]
fh = open(filepath, "r")
try:
prev_last = ""
for line in fh:
flds = line.split(",")
# determine file version
if flds[2] == "LAST":
last = float(flds[3])
vol = float(flds[4])
else:
last = float(flds[4])
vol = 0.0
# skip ticks with same last price
if prev_last == last:
continue
else:
prev_last = last
# parse time
mobj = MDF_REO.match(flds[0])
if mobj is None:
raise ValueError("no match for [%s]" % (flds[0],))
(hh, mm, ss, ms) = mobj.groups()
if ms:
c_time = datetime.time(int(hh), int(mm), int(ss), int(ms) * 1000)
else:
c_time = datetime.time(int(hh), int(mm), int(ss))
cdt = datetime.datetime.combine(tick_date, c_time)
x.append(date2num(cdt))
y.append(last)
v.append(vol)
finally:
fh.close()
# throw away first line of file (close price from previous day)
del x[0]
del y[0]
del v[0]
return (x, y, v)
def num2sod(x):
frac, integ = math.modf(x)
return frac * 86400
class Lohi:
"""Time series online low and high detector."""
def __init__(self, bias):
assert(bias > 0)
self.bias = bias
self.low0 = None
self.high0 = None
self.prev_lohi = NONE
self.lohis = [ ]
self.lows = [ ]
self.highs = [ ]
def __call__(self, tick):
"""Add extended tick to the max min parser.
@param tick: The value of the current tick.
@type tick: tuple(cdt, last)
@return: 1. Tick if new max min has been detected,
2. None otherwise.
"""
n, cdt, last = tick
res = None
# automatic initialisation
if self.low0 is None:
self.low0 = tick
self.lows.append((n, cdt, last - 1))
if self.high0 is None:
self.high0 = tick
self.highs.append((n, cdt, last + 1))
if last > self.high0[2]:
self.high0 = tick
if self.prev_lohi == NONE:
if self.high0[2] > self.low0[2] + self.bias:
res = self.high0
self.low0 = self.high0
self.lows.append(self.high0)
self.lohis.append(self.high0)
self.prev_lohi = HIGH
if last < self.low0[2]:
self.low0 = tick
if self.prev_lohi == NONE:
if self.low0[2] < self.high0[2] - self.bias:
res = self.low0
self.high0 = self.low0
self.lows.append(self.low0)
self.lohis.append(self.low0)
self.prev_lohi = LOW
if self.high0[1] < cdt - ONE_MINUTE and \
((self.prev_lohi == LOW and \
self.high0[2] > self.lows[-1][2] + self.bias) or
(self.prev_lohi == HIGH and \
self.high0[2] > self.highs[-1][2])):
res = self.high0
self.low0 = self.high0
self.highs.append(self.high0)
self.lohis.append(self.high0)
self.prev_lohi = HIGH
if self.low0[1] < cdt - ONE_MINUTE and \
((self.prev_lohi == LOW and \
self.low0[2] < self.lows[-1][2]) or
(self.prev_lohi == HIGH and \
self.low0[2] < self.highs[-1][2] - self.bias)):
res = self.low0
self.high0 = self.low0
self.lows.append(self.low0)
self.lohis.append(self.low0)
self.prev_lohi = LOW
if res:
return (self.prev_lohi, res)
else:
return None
def find_lows_highs(xs, ys):
dacp = DelayedAcp(10)
for tick in zip(range(len(xs)), xs, ys):
dacp(tick)
return dacp.lows, dacp.highs
class DelayedAcp:
"""Time series max & min detector."""
def __init__(self, bias):
assert(bias > 0)
self.bias = bias
self.trend = None
self.mm0 = None
self.lohis = [ ]
self.lows = [ ]
self.highs = [ ]
def __call__(self, tick):
"""Add extended tick to the max min parser.
@param tick: The value of the current tick.
@type tick: tuple(n, cdt, last)
@return: 1. Tick if new max min has been detected,
2. None otherwise.
"""
n, cdt, last = tick
res = None
# automatic initialisation
if self.mm0 is None:
# initialise water mark
self.mm0 = tick
res = self.mm0
self.lows = [(n, cdt, last - 1)]
self.highs = [(n, cdt, last + 1)]
else:
# initialise trend until price has changed
if self.trend is None or self.trend == 0:
self.trend = cmp(last, self.mm0[2])
# check for max
if self.trend > 0:
if last > self.mm0[2]:
self.mm0 = tick
if last < self.mm0[2] - self.bias:
self.lohis.append(self.mm0)
self.highs.append(self.mm0)
res = self.mm0
# revert trend & water mark
self.mm0 = tick
self.trend = -1
# check for min
if self.trend < 0:
if last < self.mm0[2]:
self.mm0 = tick
if last > self.mm0[2] + self.bias:
self.lohis.append(self.mm0)
self.lows.append(self.mm0)
res = self.mm0
# revert trend & water mark
self.mm0 = tick
self.trend = +1
return (cmp(self.trend, 0), res)
class Main:
def __init__(self):
warnings.simplefilter("default", np.RankWarning)
self.advance_count = 1
self.ylow = None
self.yhigh = None
self.fiblo = None
self.fibhi = None
self.fibs = None
self.root = Tk.Tk()
self.root.wm_title("Embedding in TK")
# create plot
fig = plt.figure()
self.ax1 = fig.add_subplot(211) # ticks
self.ax2 = fig.add_subplot(212) # volume
# ax3 = fig.add_subplot(313) # cash
self.ax1.set_ylabel("ticks")
self.ax2.set_ylabel("volume")
# ax3.set_ylabel("cash")
major_fmt = mdates.DateFormatter('%H:%M:%S')
major_loc = mdates.MinuteLocator(byminute = range(0, 60, 10))
minor_loc = mdates.MinuteLocator()
self.ax1.xaxis.set_major_formatter(major_fmt)
self.ax1.xaxis.set_major_locator(major_loc)
self.ax1.xaxis.set_minor_locator(minor_loc)
self.ax1.format_xdata = major_fmt
self.ax1.format_ydata = lambda x: '%1.2f' % x
self.ax1.grid(True)
self.ax2.xaxis.set_major_formatter(major_fmt)
self.ax2.xaxis.set_major_locator(mdates.MinuteLocator(byminute = range(0, 60, 10)))
self.ax2.xaxis.set_minor_locator(mdates.MinuteLocator())
self.ax2.format_xdata = major_fmt
self.ax2.format_ydata = lambda x: '%1.2f' % x
self.ax2.grid(True)
# rotates and right aligns the x labels, and moves the bottom of the
# axes up to make room for them
fig.autofmt_xdate()
# create artists
LOG.debug("Loading ticks...")
self.xs, self.ys, self.vs = tdl(datetime.datetime(2009, 6, 29))
LOG.debug("Ticks loaded.")
lows, highs = find_lows_highs(self.xs, self.ys)
self.mmh = Lohi(5)
self.w0 = 0
self.wd = 2000
self.low_high_crs = 0
xr, yr, vr = self.tick_window(self.w0, self.wd)
self.fiblo = self.fibhi = (0, self.xs[0], self.ys[0])
fit = np.average(yr)
self.tl, = self.ax1.plot_date(xr, yr, '-')
self.fl, = self.ax1.plot_date(xr, (fit,) * len(xr), 'k--')
self.mh, = self.ax1.plot_date(xr, (yr[0],) * len(xr), 'k-')
self.ml, = self.ax1.plot_date(xr, (yr[0],) * len(xr), 'k-')
# Acp markers
self.him, = self.ax1.plot_date([x for n, x, y in lows], [y for n, x, y in lows], 'go')
self.lom, = self.ax1.plot_date([x for n, x, y in highs], [y for n, x, y in highs], 'ro')
self.dl, = self.ax2.plot_date(xr, vr, '-')
self.set_axis(xr, yr)
# embed canvas in Tk
self.canvas = FigureCanvasTkAgg(fig, master=self.root)
self.canvas.draw()
self.canvas.get_tk_widget().pack(side=Tk.TOP, fill=Tk.BOTH, expand=Tk.TRUE)
# toolbar = NavigationToolbar2TkAgg( self.canvas, self.root )
# toolbar.update()
# self.canvas._tkself.canvas.pack(side=Tk.TOP, fill=Tk.BOTH, expand=1)
fr1 = Tk.Frame(master=self.root)
bu1 = Tk.Button(master=fr1, text='Quit', command=self.root.quit)
bu2 = Tk.Button(master=fr1, text='Stop', command=self.stop)
bu4 = Tk.Button(master=fr1, text='1x', command=self.times_one)
bu5 = Tk.Button(master=fr1, text='5x', command=self.times_five)
bu6 = Tk.Button(master=fr1, text='10x', command=self.times_ten)
bu1.pack(side=Tk.RIGHT, padx=5, pady=5)
bu6.pack(side=Tk.RIGHT, padx=5, pady=5)
bu5.pack(side=Tk.RIGHT, padx=5, pady=5)
bu4.pack(side=Tk.RIGHT, padx=5, pady=5)
bu2.pack(side=Tk.RIGHT, padx=5, pady=5)
fr1.pack(side=Tk.BOTTOM)
def animate(self):
self.w0 += self.advance_count
# prepare timeline window
xr, yr, vr = self.tick_window(self.w0, self.wd)
while self.low_high_crs < self.w0 + self.wd:
self.mark_low_high(self.low_high_crs)
self.fib_low_high(self.low_high_crs)
self.low_high_crs += 1
# build polynomial fit
lohis = self.mmh.lohis
if len(lohis) >= 4:
n0, x0, y0 = lohis[-4]
n1, x1, y1 = lohis[-1]
x2 = xr[-1]
coefs = np.polyfit([num2sod(x) for n, x, y in lohis[-4:]], [y for n, x, y in lohis[-4:]], 1)
self.fl.set_data((x0, x2), [np.polyval(coefs, num2sod(x)) for x in (x0, x2)])
# width of trend channel
mx = 0
for n in range(n0, n1):
mx = max(mx, math.fabs(np.polyval(coefs, num2sod(self.xs[n])) - self.ys[n]))
a, b = coefs
self.mh.set_data((x0, x2), [np.polyval((a, b+mx), num2sod(x)) for x in (x0, x2)])
self.ml.set_data((x0, x2), [np.polyval((a, b-mx), num2sod(x)) for x in (x0, x2)])
# update tick line
self.tl.set_data(xr, yr)
self.dl.set_data(xr, vr)
# update axis
self.set_axis(xr, yr)
self.canvas.draw()
if self.w0 < len(self.xs) - self.wd - 1:
self.after_id = self.root.after(10, self.animate)
def set_axis(self, xr, yr, bias=50):
if self.ylow is None:
self.ylow = yr[0] - bias / 2
self.yhigh = yr[0] + bias / 2
for y in yr:
if y < self.ylow:
self.ylow = y
self.yhigh = self.ylow + bias
if y > self.yhigh:
self.yhigh = y
self.ylow = self.yhigh - bias
self.ax1.axis([xr[0], xr[-1], self.ylow, self.yhigh])
self.ax2.axis([xr[0], xr[-1], 0, 50000])
def tick_window(self, w0, wd = 1000):
return (self.xs[w0:w0 + wd], self.ys[w0:w0 + wd], self.vs[w0:w0 + wd])
def fib_low_high(self, n):
tick = (n, self.xs[n], self.ys[n])
n, x, y = tick
hin, hix, hiy = self.fibhi
lon, lox, loy = self.fiblo
delta = hiy - loy
# 61.8, 50.0, 38.2, 23.6 %
y61 = loy + delta * 0.618
y50 = loy + delta * 0.50
y38 = loy + delta * 0.382
y23 = loy + delta * 0.236
if y < self.fiblo[2]:
self.fiblo = tick
if y > self.fibhi[2]:
self.fibhi = tick
if self.fibs is not None:
if lox > hix and y > y50:
self.fibs = None
self.fibhi = tick
if lox < hix and y < y50:
self.fibs = None
self.fiblo = tick
# create fib lines if lo hi differs more than 10 pts
if delta > 10:
xr = (min(lox, hix), x)
if self.fibs is None:
l100, = self.ax1.plot_date(xr, (hiy, hiy), 'r-')
l61, = self.ax1.plot_date(xr, (y61, y61), 'r--')
l50, = self.ax1.plot_date(xr, (y50, y50), 'r--')
l38, = self.ax1.plot_date(xr, (y38, y38), 'r--')
l23, = self.ax1.plot_date(xr, (y23, y23), 'r--')
l0, = self.ax1.plot_date(xr, (loy, loy), 'r-')
self.fibs = (l100, l61, l50, l38, l23, l0)
else:
l100, l61, l50, l38, l23, l0 = self.fibs
l100.set_data(xr, (hiy, hiy))
l61.set_data(xr, (y61, y61))
l50.set_data(xr, (y50, y50))
l38.set_data(xr, (y38, y38))
l23.set_data(xr, (y23, y23))
l0.set_data(xr, (loy, loy))
def mark_low_high(self, n):
x = self.xs
y = self.ys
rc = self.mmh((n, x[n], y[n]))
if rc:
lohi, tick = rc
nlh, xlh, ylh = tick
if lohi < 0:
# low
self.ax1.annotate('low',
xy=(x[nlh], y[nlh]),
xytext=(x[n], y[nlh]),
arrowprops=dict(facecolor='red',
frac=0.3,
shrink=0.1))
elif lohi > 0:
# high
self.ax1.annotate('high',
xy=(x[nlh], y[nlh]),
xytext=(x[n], y[nlh]),
arrowprops=dict(facecolor='green',
frac=0.3,
shrink=0.1))
def stop(self):
if self.after_id:
self.root.after_cancel(self.after_id)
self.after_id = None
def resume(self):
if self.after_id is None:
self.after_id = self.root.after(10, self.animate)
def times_one(self):
self.advance_count = 1
self.resume()
def times_five(self):
self.advance_count = 5
self.resume()
def times_ten(self):
self.advance_count = 10
self.resume()
def run(self):
self.root.after(500, self.animate)
self.root.mainloop()
self.root.destroy()
if __name__ == "__main__":
app = Main()
app.run()

View File

@@ -1,18 +0,0 @@
# Copyright (c) 2009 Andreas Balogh
# See LICENSE for details.
""" Globals
Global variables
Note: this module implements the singleton pattern.
"""
# system imports
# local imports
# constants
RTTRD_VAR = "d:\\rttrd-prd-var"
PAD_DATA = "d:\\rttrd-dev-var\\analysis\\data"

View File

@@ -1,174 +0,0 @@
# Copyright (c) 2009 Andreas Balogh
# See LICENSE for details.
""" animated drawing of ticks
using blit method
"""
# system imports
import Tkinter as tk
import datetime
import os
import re
import logging
import sys
import matplotlib.pyplot as plt
from matplotlib.dates import date2num
import matplotlib.dates as mdates
import numpy as np
# local imports
# constants
# globals
LOG = logging.getLogger()
logging.basicConfig(level=logging.DEBUG,
format='%(asctime)s.%(msecs)03i %(levelname).4s %(process)d:%(thread)d %(message)s',
datefmt='%H:%M:%S')
MDF_REO = re.compile("(..):(..):(..)\.*(\d+)*")
def tdl(tick_date):
""" returns a list of tick tuples (cdt, last) for specified day """
fiid = "846900"
year = tick_date.strftime("%Y")
yyyymmdd = tick_date.strftime("%Y%m%d")
filename = "%s.csv" % (fiid)
filepath = os.path.join("d:\\rttrd-prd-var\\consors-mdf\\data", year, yyyymmdd, filename)
x = [ ]
y = [ ]
fh = open(filepath, "r")
try:
prev_last = ""
for line in fh:
flds = line.split(",")
# determine file version
if flds[2] == "LAST":
last = float(flds[3])
vol = float(flds[4])
else:
last = float(flds[4])
vol = 0.0
# skip ticks with same last price
if prev_last == last:
continue
else:
prev_last = last
# parse time
mobj = MDF_REO.match(flds[0])
if mobj is None:
raise ValueError("no match for [%s]" % (flds[0],))
(hh, mm, ss, ms) = mobj.groups()
if ms:
c_time = datetime.time(int(hh), int(mm), int(ss), int(ms) * 1000)
else:
c_time = datetime.time(int(hh), int(mm), int(ss))
cdt = datetime.datetime.combine(tick_date, c_time)
x.append(date2num(cdt))
y.append(last)
finally:
fh.close()
# throw away first line of file (close price from previous day)
del x[0]
del y[0]
return (x, y)
def main():
LOG.debug("Loading ticks...")
x, y = tdl(datetime.datetime(2009,6,3))
LOG.debug("Ticks loaded.")
fig = plt.figure()
ax1 = fig.add_subplot(311) # ticks
canvas = ax1.figure.canvas
# ax2 = fig.add_subplot(312) # gearing
# ax3 = fig.add_subplot(313) # cash
ax1.set_ylabel("ticks")
# ax2.set_ylabel("gearing")
# ax3.set_ylabel("cash")
xr = [x[0]] * 500
yr = [y[0]] * 500
line, = ax1.plot_date(xr, yr, '-', xdate = True)
major_fmt = mdates.DateFormatter('%H:%M:%S')
ax1.xaxis.set_major_formatter(major_fmt)
# ax1.axis('tight')
# format the coords message box
def price(x): return '%1.2f'%x
# ax.format_xdata = mdates.DateFormatter('%Y-%m-%d')
ax1.format_xdata = mdates.DateFormatter('%H:%M:%S')
ax1.format_ydata = price
ax1.grid(True)
# rotates and right aligns the x labels, and moves the bottom of the
# axes up to make room for them
fig.autofmt_xdate()
background = canvas.copy_from_bbox(ax1.bbox)
def animate():
w = 1000
bias = 10
ymin = min(y[0:100])
ymax = max(y[0:100])
low = ymin - bias
high = ymax + bias
trend = 0
for i in range(0, len(x)-w):
# restore the clean slate background
canvas.restore_region(background)
# prepare timeline window
xr = x[i:i+w]
yr = y[i:i+w]
# update line
line.set_xdata(xr)
line.set_ydata(yr)
# determine y axis
if yr[-1] > ymax:
ymax = yr[-1]
if ymax - 50 < min(yr):
ymin = ymax - 50
if yr[-1] < ymin:
ymin = yr[-1]
if ymin + 50 > max(yr):
ymax = ymin + 50
ax1.axis([xr[0], xr[-1], ymin, ymax])
# check low high and annotate
if i > 0 and i % 150 == 0:
ax1.annotate('low/high',
xy = (xr[-10], yr[-10]),
xytext = (xr[-5], yr[-10] + 10),
arrowprops = dict(facecolor = 'black',
frac = 0.2,
headwidth = 10,
linewidth = 0.1,
shrink = 0.05))
# just draw the animated artist
# FIXME: redraw all items in graph
ax1.draw_artist(line)
# just redraw the axes rectangle
canvas.blit(ax1.bbox)
root = fig.canvas.manager.window
root.after(100, animate)
plt.show()
if __name__ == "__main__":
main()

View File

@@ -1,163 +0,0 @@
# Copyright (c) 2008 Andreas Balogh
# See LICENSE for details.
""" animated drawing of ticks
using draw method
"""
# system imports
import Tkinter as tk
import datetime
import os
import re
import logging
import sys
import matplotlib.pyplot as plt
from matplotlib.dates import date2num
import matplotlib.dates as mdates
import numpy as np
# local imports
# constants
# globals
LOG = logging.getLogger()
logging.basicConfig(level=logging.DEBUG,
format='%(asctime)s.%(msecs)03i %(levelname).4s %(process)d:%(thread)d %(message)s',
datefmt='%H:%M:%S')
MDF_REO = re.compile("(..):(..):(..)\.*(\d+)*")
def tdl(tick_date):
""" returns a list of tick tuples (cdt, last) for specified day """
fiid = "846900"
year = tick_date.strftime("%Y")
yyyymmdd = tick_date.strftime("%Y%m%d")
filename = "%s.csv" % (fiid)
filepath = os.path.join("d:\\rttrd-prd-var\\consors-mdf\\data", year, yyyymmdd, filename)
x = [ ]
y = [ ]
fh = open(filepath, "r")
try:
prev_last = ""
for line in fh:
flds = line.split(",")
# determine file version
if flds[2] == "LAST":
last = float(flds[3])
vol = float(flds[4])
else:
last = float(flds[4])
vol = 0.0
# skip ticks with same last price
if prev_last == last:
continue
else:
prev_last = last
# parse time
mobj = MDF_REO.match(flds[0])
if mobj is None:
raise ValueError("no match for [%s]" % (flds[0],))
(hh, mm, ss, ms) = mobj.groups()
if ms:
c_time = datetime.time(int(hh), int(mm), int(ss), int(ms) * 1000)
else:
c_time = datetime.time(int(hh), int(mm), int(ss))
cdt = datetime.datetime.combine(tick_date, c_time)
x.append(date2num(cdt))
y.append(last)
finally:
fh.close()
# throw away first line of file (close price from previous day)
del x[0]
del y[0]
return (x, y)
def main():
LOG.debug("Loading ticks...")
x, y = tdl(datetime.datetime(2009,6,3))
LOG.debug("Ticks loaded.")
fig = plt.figure()
ax1 = fig.add_subplot(311) # ticks
# ax2 = fig.add_subplot(312) # gearing
# ax3 = fig.add_subplot(313) # cash
ax1.set_ylabel("ticks")
# ax2.set_ylabel("gearing")
# ax3.set_ylabel("cash")
xr = [x[0]] * 500
yr = [y[0]] * 500
line, = ax1.plot_date(xr, yr, '-', xdate = True)
major_fmt = mdates.DateFormatter('%H:%M:%S')
ax1.xaxis.set_major_formatter(major_fmt)
# ax1.axis('tight')
# format the coords message box
def price(x): return '%1.2f'%x
# ax.format_xdata = mdates.DateFormatter('%Y-%m-%d')
ax1.format_xdata = mdates.DateFormatter('%H:%M:%S')
ax1.format_ydata = price
ax1.grid(True)
# rotates and right aligns the x labels, and moves the bottom of the
# axes up to make room for them
fig.autofmt_xdate()
def animate():
w = 1000
bias = 10
ymin = min(y[0:100])
ymax = max(y[0:100])
low = ymin - bias
high = ymax + bias
trend = 0
for i in range(0, len(x)-w):
# prepare timeline window
xr = x[i:i+w]
yr = y[i:i+w]
# update line
line.set_xdata(xr)
line.set_ydata(yr)
# determine y axis
if yr[-1] > ymax:
ymax = yr[-1]
if ymax - 50 < min(yr):
ymin = ymax - 50
if yr[-1] < ymin:
ymin = yr[-1]
if ymin + 50 > max(yr):
ymax = ymin + 50
ax1.axis([xr[0], xr[-1], ymin, ymax])
# check low high and annotate
if i > 0 and i % 150 == 0:
ax1.annotate('low/high',
xy = (xr[-10], yr[-10]),
xytext = (xr[-5], yr[-10] + 10),
arrowprops = dict(facecolor = 'black',
frac = 0.2,
headwidth = 10,
linewidth = 0.1,
shrink = 0.05))
fig.canvas.draw()
root = fig.canvas.manager.window
root.after(100, animate)
plt.show()
if __name__ == "__main__":
main()

View File

@@ -1,330 +0,0 @@
# Copyright (c) 2008 Andreas Balogh
# See LICENSE for details.
""" animated drawing of ticks
using self.canvas embedded in Tk application
Fibionacci retracements of 61.8, 50.0, 38.2, 23.6 % of min and max
"""
# system imports
import datetime
import os
import re
import logging
import sys
import warnings
import Tkinter as Tk
import numpy as np
import matplotlib
matplotlib.use('TkAgg')
from matplotlib.backends.backend_tkagg import FigureCanvasTkAgg, NavigationToolbar2TkAgg
from matplotlib.figure import Figure
import matplotlib.pyplot as plt
import matplotlib.dates as mdates
from matplotlib.dates import date2num
# local imports
# constants
# globals
LOG = logging.getLogger()
logging.basicConfig(level=logging.DEBUG,
format='%(asctime)s.%(msecs)03i %(levelname).4s %(process)d:%(thread)d %(message)s',
datefmt='%H:%M:%S')
MDF_REO = re.compile("(..):(..):(..)\.*(\d+)*")
def tdl(tick_date):
""" returns a list of tick tuples (cdt, last) for specified day """
fiid = "846900"
year = tick_date.strftime("%Y")
yyyymmdd = tick_date.strftime("%Y%m%d")
filename = "%s.csv" % (fiid)
filepath = os.path.join("d:\\rttrd-prd-var\\consors-mdf\\data", year, yyyymmdd, filename)
x = [ ]
y = [ ]
fh = open(filepath, "r")
try:
prev_last = ""
for line in fh:
flds = line.split(",")
# determine file version
if flds[2] == "LAST":
last = float(flds[3])
vol = float(flds[4])
else:
last = float(flds[4])
vol = 0.0
# skip ticks with same last price
if prev_last == last:
continue
else:
prev_last = last
# parse time
mobj = MDF_REO.match(flds[0])
if mobj is None:
raise ValueError("no match for [%s]" % (flds[0],))
(hh, mm, ss, ms) = mobj.groups()
if ms:
c_time = datetime.time(int(hh), int(mm), int(ss), int(ms) * 1000)
else:
c_time = datetime.time(int(hh), int(mm), int(ss))
cdt = datetime.datetime.combine(tick_date, c_time)
x.append(date2num(cdt))
y.append(last)
finally:
fh.close()
# throw away first line of file (close price from previous day)
del x[0]
del y[0]
return (x, y)
class Mmh:
"""Time series max & min detector."""
def __init__(self, bias):
assert(bias > 0)
self.bias = bias
self.trend = None
self.mm0 = None
self.mms = [ ]
self.mins = [ ]
self.maxs = [ ]
def __call__(self, tick):
"""Add extended tick to the max min parser.
@param tick: The value of the current tick.
@type tick: tuple(cdt, last)
@return: 1. Tick if new max min has been detected,
2. None otherwise.
"""
n, cdt, last = tick
res = None
# automatic initialisation
if self.mm0 is None:
# initalise water mark
self.mm0 = tick
res = self.mm0
self.mins = [(n, cdt, last - 1)]
self.maxs = [(n, cdt, last + 1)]
else:
# initalise trend until price has changed
if self.trend is None or self.trend == 0:
self.trend = cmp(last, self.mm0[2])
# check for max
if self.trend > 0:
if last > self.mm0[2]:
self.mm0 = tick
if last < self.mm0[2] - self.bias:
self.mms.insert(0, self.mm0)
self.maxs.append(self.mm0)
res = self.mm0
# revert trend & water mark
self.mm0 = tick
self.trend = -1
# check for min
if self.trend < 0:
if last < self.mm0[2]:
self.mm0 = tick
if last > self.mm0[2] + self.bias:
self.mms.insert(0, self.mm0)
self.mins.append(self.mm0)
res = self.mm0
# revert trend & water mark
self.mm0 = tick
self.trend = +1
return res
class Main:
def __init__(self):
LOG.debug("Loading ticks...")
self.x, self.y = tdl(datetime.datetime(2009,6,3))
LOG.debug("Ticks loaded.")
self.mmh = Mmh(10)
self.root = Tk.Tk()
self.root.wm_title("Embedding in TK")
fig = plt.figure()
self.ax1 = fig.add_subplot(111) # ticks
# ax2 = fig.add_subplot(312) # gearing
# ax3 = fig.add_subplot(313) # cash
self.ax1.set_ylabel("ticks")
# ax2.set_ylabel("gearing")
# ax3.set_ylabel("cash")
self.w = 1000
self.bias = 10
xr = self.x[0:self.w]
yr = self.y[0:self.w]
fit = np.average(yr)
self.tl, = self.ax1.plot_date(xr, yr, '-')
self.fl, = self.ax1.plot_date(xr, (fit, ) * self.w, 'k--')
self.mh, = self.ax1.plot_date(xr, (yr[0], ) * self.w, 'g:')
self.ml, = self.ax1.plot_date(xr, (yr[0], ) * self.w, 'r:')
major_fmt = mdates.DateFormatter('%H:%M:%S')
self.ax1.xaxis.set_major_formatter(major_fmt)
self.ax1.format_xdata = mdates.DateFormatter('%H:%M:%S')
self.ax1.format_ydata = lambda x: '%1.2f'%x
self.ax1.grid(True)
# rotates and right aligns the x labels, and moves the bottom of the
# axes up to make room for them
fig.autofmt_xdate()
self.ax1.axis([xr[0], xr[-1]+180./86400., min(yr), max(yr)])
self.canvas = FigureCanvasTkAgg(fig, master=self.root)
self.canvas.draw()
self.canvas.get_tk_widget().pack(side=Tk.TOP, fill=Tk.BOTH, expand=1)
# toolbar = NavigationToolbar2TkAgg( self.canvas, self.root )
# toolbar.update()
# self.canvas._tkself.canvas.pack(side=Tk.TOP, fill=Tk.BOTH, expand=1)
fr1 = Tk.Frame(master = self.root)
bu1 = Tk.Button(master = fr1, text='Quit', command=self.root.quit)
bu2 = Tk.Button(master = fr1, text='Stop', command=self.stop)
bu3 = Tk.Button(master = fr1, text='Resume', command=self.resume)
bu1.pack(side = Tk.RIGHT, padx = 5, pady = 5)
bu2.pack(side = Tk.RIGHT, padx = 5, pady = 5)
bu3.pack(side = Tk.RIGHT, padx = 5, pady = 5)
fr1.pack(side = Tk.BOTTOM)
def animate_start(self):
warnings.simplefilter("default", np.RankWarning)
self.ymin = min(self.y[0:self.w])
self.ymax = max(self.y[0:self.w])
self.low = self.ymin - self.bias
self.high = self.ymax + self.bias
self.trend = 0
self.i = 0
for n in range(0, self.w):
self.mark_low_high(n)
self.root.after(500, self.animate_step)
def animate_step(self):
# prepare timeline window
xr = np.array( self.x[self.i:self.i+self.w] )
yr = np.array( self.y[self.i:self.i+self.w] )
# update line
self.tl.set_xdata(xr)
self.tl.set_ydata(yr)
# determine y axis
if yr[-1] > self.ymax:
self.ymax = yr[-1]
if self.ymax - 50 < min(yr):
self.ymin = self.ymax - 50
if yr[-1] < self.ymin:
self.ymin = yr[-1]
if self.ymin + 50 > max(yr):
self.ymax = self.ymin + 50
# check self.low self.high and annotate
fwd = 2
for n in range(self.i+self.w, self.i+self.w+fwd):
self.mark_low_high(n)
self.i += fwd
# build polynomial fit
mms = self.mmh.mms
if len(mms) > 1:
# mx = [ (x - int(x)) * 86400 for n, x, y in mms[:4] ]
# my = [ y for n, x, y in mms[:4] ]
mx = [ (x - int(x)) * 86400 for x in xr ]
my = yr
xre = [ xr[-1] + x/86400. for x in range(1, 181)]
xr2 = np.append(xr, xre)
# print "mx: ", mx
# print "my: ", my
polyval = np.polyfit(mx, my, 30)
# print "poly1d: ", polyval
intx = np.array(xr, dtype = int)
sodx = xr - intx
sodx *= 86400
s2x = np.append(sodx, range(sodx[-1], sodx[-1]+180))
fit = np.polyval(polyval, s2x)
self.fl.set_xdata(xr2)
self.fl.set_ydata(fit)
maxs = self.mmh.maxs
if len(maxs) > 1:
n, x1, y1 = maxs[-2]
n, x2, y2 = maxs[-1]
x3 = xr[-1]
polyfit = np.polyfit((x1, x2), (y1, y2), 1)
y3 = np.polyval(polyfit, x3)
self.mh.set_data((x1, x2, x3), (y1, y2, y3))
mins = self.mmh.mins
if len(mins) > 1:
n, x1, y1 = mins[-2]
n, x2, y2 = mins[-1]
x3 = xr[-1]
polyfit = np.polyfit((x1, x2), (y1, y2), 1)
y3 = np.polyval(polyfit, x3)
self.ml.set_data((x1, x2, x3), (y1, y2, y3))
# draw
self.ax1.axis([xr[0], xr[-1]+180./86400., self.ymin, self.ymax])
self.canvas.draw()
if self.i < len(self.x)-self.w-1:
self.after_id = self.root.after(10, self.animate_step)
def build_fit(self):
pass
def mark_low_high(self, n):
x = self.x
y = self.y
rc = self.mmh((n, x[n], y[n]))
if rc:
nlh, xlh, ylh = rc
if self.mmh.trend > 0:
# low
self.ax1.annotate('low',
xy = (x[nlh], y[nlh]),
xytext = (x[n], y[nlh]),
arrowprops = dict(facecolor = 'red',
shrink = 0.05))
# an.set_annotation_clip(True)
elif self.mmh.trend < 0:
# high
self.ax1.annotate('high',
xy = (x[nlh], y[nlh]),
xytext = (x[n], y[nlh]),
arrowprops = dict(facecolor = 'green',
shrink = 0.05))
# an.set_annotation_clip(True)
def stop(self):
if self.after_id:
self.root.after_cancel(self.after_id)
self.after_id = None
def resume(self):
if self.after_id is None:
self.after_id = self.root.after(10, self.animate_step)
def run(self):
self.root.after(500, self.animate_start)
self.root.mainloop()
self.root.destroy()
if __name__ == "__main__":
app = Main()
app.run()

View File

@@ -1,499 +0,0 @@
# Copyright (c) 2008 Andreas Balogh
# See LICENSE for details.
""" patterns and distance
A. collect data
1. one tick every minute 10-20 mins back
2. use LoHi max until market close as performance indicator
B. cluster and analyse data according to distance
1. find clusters with net positive P&L. many clusters will exhibit useless patterns
C. check performance with backtest
"""
# system imports
import datetime
import os
import re
import logging
import warnings
import math
import Tkinter as Tk
import numpy as np
import matplotlib
matplotlib.use('TkAgg')
from matplotlib.backends.backend_tkagg import FigureCanvasTkAgg
import matplotlib.pyplot as plt
import matplotlib.dates as mdates
from matplotlib.dates import date2num
# local imports
from globals import *
# constants
ONE_MINUTE = 60. / 86400.
LOW, NONE, HIGH = range(-1, 2)
# globals
LOG = logging.getLogger()
logging.basicConfig(level=logging.DEBUG,
format='%(asctime)s.%(msecs)03i %(levelname).4s %(process)d:%(thread)d %(message)s',
datefmt='%H:%M:%S')
MDF_REO = re.compile("(..):(..):(..)\.*(\d+)*")
def tdl(tick_date):
""" returns a list of tick tuples (cdt, last) for specified day """
fiid = "846900"
year = tick_date.strftime("%Y")
yyyymmdd = tick_date.strftime("%Y%m%d")
filename = "%s.csv" % (fiid)
filepath = os.path.join(RTTRD_VAR, "consors-mdf\\data", year, yyyymmdd, filename)
x = [ ]
y = [ ]
v = [ ]
fh = open(filepath, "r")
try:
prev_last = ""
for line in fh:
flds = line.split(",")
# determine file version
if flds[2] == "LAST":
last = float(flds[3])
vol = float(flds[4])
else:
last = float(flds[4])
vol = 0.0
# skip ticks with same last price
if prev_last == last:
continue
else:
prev_last = last
# parse time
mobj = MDF_REO.match(flds[0])
if mobj is None:
raise ValueError("no match for [%s]" % (flds[0],))
(hh, mm, ss, ms) = mobj.groups()
if ms:
c_time = datetime.time(int(hh), int(mm), int(ss), int(ms) * 1000)
else:
c_time = datetime.time(int(hh), int(mm), int(ss))
cdt = datetime.datetime.combine(tick_date, c_time)
x.append(date2num(cdt))
y.append(last)
v.append(vol)
finally:
fh.close()
# throw away first line of file (close price from previous day)
del x[0]
del y[0]
del v[0]
return (x, y, v)
def num2sod(x):
frac, integ = math.modf(x)
return frac * 86400
class Lohi:
"""Time series online low and high detector."""
def __init__(self, bias):
assert(bias > 0)
self.bias = bias
self.low0 = None
self.high0 = None
self.prev_lohi = NONE
self.lohis = [ ]
self.lows = [ ]
self.highs = [ ]
def __call__(self, tick):
"""Add extended tick to the max min parser.
@param tick: The value of the current tick.
@type tick: tuple(cdt, last)
@return: 1. Tick if new max min has been detected,
2. None otherwise.
"""
n, cdt, last = tick
res = None
# automatic initialisation
if self.low0 is None:
self.low0 = tick
self.lows.append((n, cdt, last - 1))
if self.high0 is None:
self.high0 = tick
self.highs.append((n, cdt, last + 1))
if last > self.high0[2]:
self.high0 = tick
if self.prev_lohi == NONE:
if self.high0[2] > self.low0[2] + self.bias:
res = self.high0
self.low0 = self.high0
self.lows.append(self.high0)
self.lohis.append(self.high0)
self.prev_lohi = HIGH
if last < self.low0[2]:
self.low0 = tick
if self.prev_lohi == NONE:
if self.low0[2] < self.high0[2] - self.bias:
res = self.low0
self.high0 = self.low0
self.lows.append(self.low0)
self.lohis.append(self.low0)
self.prev_lohi = LOW
if self.high0[1] < cdt - ONE_MINUTE and \
((self.prev_lohi == LOW and \
self.high0[2] > self.lows[-1][2] + self.bias) or
(self.prev_lohi == HIGH and \
self.high0[2] > self.highs[-1][2])):
res = self.high0
self.low0 = self.high0
self.highs.append(self.high0)
self.lohis.append(self.high0)
self.prev_lohi = HIGH
if self.low0[1] < cdt - ONE_MINUTE and \
((self.prev_lohi == LOW and \
self.low0[2] < self.lows[-1][2]) or
(self.prev_lohi == HIGH and \
self.low0[2] < self.highs[-1][2] - self.bias)):
res = self.low0
self.high0 = self.low0
self.lows.append(self.low0)
self.lohis.append(self.low0)
self.prev_lohi = LOW
if res:
return (self.prev_lohi, res)
else:
return None
def find_lows_highs(xs, ys):
dacp = DelayedAcp(10)
for tick in zip(range(len(xs)), xs, ys):
dacp(tick)
return dacp.lows, dacp.highs
class DelayedAcp:
"""Time series max & min detector."""
def __init__(self, bias):
assert(bias > 0)
self.bias = bias
self.trend = None
self.mm0 = None
self.lohis = [ ]
self.lows = [ ]
self.highs = [ ]
def __call__(self, tick):
"""Add extended tick to the max min parser.
@param tick: The value of the current tick.
@type tick: tuple(n, cdt, last)
@return: 1. Tick if new max min has been detected,
2. None otherwise.
"""
n, cdt, last = tick
res = None
# automatic initialisation
if self.mm0 is None:
# initialise water mark
self.mm0 = tick
res = self.mm0
self.lows = [(n, cdt, last - 1)]
self.highs = [(n, cdt, last + 1)]
else:
# initialise trend until price has changed
if self.trend is None or self.trend == 0:
self.trend = cmp(last, self.mm0[2])
# check for max
if self.trend > 0:
if last > self.mm0[2]:
self.mm0 = tick
if last < self.mm0[2] - self.bias:
self.lohis.append(self.mm0)
self.highs.append(self.mm0)
res = self.mm0
# revert trend & water mark
self.mm0 = tick
self.trend = -1
# check for min
if self.trend < 0:
if last < self.mm0[2]:
self.mm0 = tick
if last > self.mm0[2] + self.bias:
self.lohis.append(self.mm0)
self.lows.append(self.mm0)
res = self.mm0
# revert trend & water mark
self.mm0 = tick
self.trend = +1
return (cmp(self.trend, 0), res)
def harvest_patterns():
LOG.debug("Loading ticks...")
xs, ys, vs = tdl(datetime.datetime(2009, 6, 25))
LOG.debug("Ticks loaded.")
lows, highs = find_lows_highs(xs, ys)
def analyse_patterns():
pass
class Main:
def __init__(self):
warnings.simplefilter("default", np.RankWarning)
self.advance_count = 1
self.ylow = None
self.yhigh = None
self.fiblo = None
self.fibhi = None
self.fibs = None
self.root = Tk.Tk()
self.root.wm_title("Embedding in TK")
# create plot
fig = plt.figure()
self.ax1 = fig.add_subplot(211) # ticks
self.ax2 = fig.add_subplot(212) # volume
# ax3 = fig.add_subplot(313) # cash
self.ax1.set_ylabel("ticks")
self.ax2.set_ylabel("volume")
# ax3.set_ylabel("cash")
major_fmt = mdates.DateFormatter('%H:%M:%S')
major_loc = mdates.MinuteLocator(byminute = range(0, 60, 10))
minor_loc = mdates.MinuteLocator()
self.ax1.xaxis.set_major_formatter(major_fmt)
self.ax1.xaxis.set_major_locator(major_loc)
self.ax1.xaxis.set_minor_locator(minor_loc)
self.ax1.format_xdata = major_fmt
self.ax1.format_ydata = lambda x: '%1.2f' % x
self.ax1.grid(True)
self.ax2.xaxis.set_major_formatter(major_fmt)
self.ax2.xaxis.set_major_locator(mdates.MinuteLocator(byminute = range(0, 60, 10)))
self.ax2.xaxis.set_minor_locator(mdates.MinuteLocator())
self.ax2.format_xdata = major_fmt
self.ax2.format_ydata = lambda x: '%1.2f' % x
self.ax2.grid(True)
# rotates and right aligns the x labels, and moves the bottom of the
# axes up to make room for them
fig.autofmt_xdate()
# create artists
LOG.debug("Loading ticks...")
self.xs, self.ys, self.vs = tdl(datetime.datetime(2009, 6, 25))
LOG.debug("Ticks loaded.")
lows, highs = find_lows_highs(self.xs, self.ys)
self.mmh = Lohi(5)
self.w0 = 0
self.wd = 2000
self.low_high_crs = 0
xr, yr, vr = self.tick_window(self.w0, self.wd)
self.fiblo = self.fibhi = (0, self.xs[0], self.ys[0])
fit = np.average(yr)
self.tl, = self.ax1.plot_date(xr, yr, '-')
self.fl, = self.ax1.plot_date(xr, (fit,) * len(xr), 'k--')
self.mh, = self.ax1.plot_date(xr, (yr[0],) * len(xr), 'k-')
self.ml, = self.ax1.plot_date(xr, (yr[0],) * len(xr), 'k-')
# Acp markers
self.him, = self.ax1.plot_date([x for n, x, y in lows], [y for n, x, y in lows], 'go')
self.lom, = self.ax1.plot_date([x for n, x, y in highs], [y for n, x, y in highs], 'ro')
self.dl, = self.ax2.plot_date(xr, vr, '-')
self.set_axis(xr, yr)
# embed canvas in Tk
self.canvas = FigureCanvasTkAgg(fig, master=self.root)
self.canvas.draw()
self.canvas.get_tk_widget().pack(side=Tk.TOP, fill=Tk.BOTH, expand=Tk.TRUE)
# toolbar = NavigationToolbar2TkAgg( self.canvas, self.root )
# toolbar.update()
# self.canvas._tkself.canvas.pack(side=Tk.TOP, fill=Tk.BOTH, expand=1)
fr1 = Tk.Frame(master=self.root)
bu1 = Tk.Button(master=fr1, text='Quit', command=self.root.quit)
bu2 = Tk.Button(master=fr1, text='Stop', command=self.stop)
bu4 = Tk.Button(master=fr1, text='1x', command=self.times_one)
bu5 = Tk.Button(master=fr1, text='5x', command=self.times_five)
bu6 = Tk.Button(master=fr1, text='10x', command=self.times_ten)
bu1.pack(side=Tk.RIGHT, padx=5, pady=5)
bu6.pack(side=Tk.RIGHT, padx=5, pady=5)
bu5.pack(side=Tk.RIGHT, padx=5, pady=5)
bu4.pack(side=Tk.RIGHT, padx=5, pady=5)
bu2.pack(side=Tk.RIGHT, padx=5, pady=5)
fr1.pack(side=Tk.BOTTOM)
def animate(self):
self.w0 += self.advance_count
# prepare timeline window
xr, yr, vr = self.tick_window(self.w0, self.wd)
while self.low_high_crs < self.w0 + self.wd:
self.mark_low_high(self.low_high_crs)
self.fib_low_high(self.low_high_crs)
self.low_high_crs += 1
# build polynomial fit
lohis = self.mmh.lohis
if len(lohis) >= 4:
n0, x0, y0 = lohis[-4]
n1, x1, y1 = lohis[-1]
x2 = xr[-1]
coefs = np.polyfit([num2sod(x) for n, x, y in lohis[-4:]], [y for n, x, y in lohis[-4:]], 1)
self.fl.set_data((x0, x2), [np.polyval(coefs, num2sod(x)) for x in (x0, x2)])
# width of trend channel
mx = 0
for n in range(n0, n1):
mx = max(mx, math.fabs(np.polyval(coefs, num2sod(self.xs[n])) - self.ys[n]))
a, b = coefs
self.mh.set_data((x0, x2), [np.polyval((a, b+mx), num2sod(x)) for x in (x0, x2)])
self.ml.set_data((x0, x2), [np.polyval((a, b-mx), num2sod(x)) for x in (x0, x2)])
# update tick line
self.tl.set_data(xr, yr)
self.dl.set_data(xr, vr)
# update axis
self.set_axis(xr, yr)
self.canvas.draw()
if self.w0 < len(self.xs) - self.wd - 1:
self.after_id = self.root.after(10, self.animate)
def set_axis(self, xr, yr, bias=50):
if self.ylow is None:
self.ylow = yr[0] - bias / 2
self.yhigh = yr[0] + bias / 2
for y in yr:
if y < self.ylow:
self.ylow = y
self.yhigh = self.ylow + bias
if y > self.yhigh:
self.yhigh = y
self.ylow = self.yhigh - bias
self.ax1.axis([xr[0], xr[-1], self.ylow, self.yhigh])
self.ax2.axis([xr[0], xr[-1], 0, 50000])
def tick_window(self, w0, wd = 1000):
return (self.xs[w0:w0 + wd], self.ys[w0:w0 + wd], self.vs[w0:w0 + wd])
def fib_low_high(self, n):
tick = (n, self.xs[n], self.ys[n])
redraw = False
n, x, y = tick
hin, hix, hiy = self.fibhi
lon, lox, loy = self.fiblo
delta = hiy - loy
# 61.8, 50.0, 38.2, 23.6 %
y61 = loy + delta * 0.618
y50 = loy + delta * 0.50
y38 = loy + delta * 0.382
y23 = loy + delta * 0.236
if y < self.fiblo[2]:
self.fiblo = tick
if y > self.fibhi[2]:
self.fibhi = tick
if self.fibs is not None:
if lox > hix and y > y50:
self.fibs = None
self.fibhi = tick
if lox < hix and y < y50:
self.fibs = None
self.fiblo = tick
# create fib lines if lo hi differs more than 10 pts
if delta > 10:
xr = (min(lox, hix), x)
if self.fibs is None:
l100, = self.ax1.plot_date(xr, (hiy, hiy), 'r-')
l61, = self.ax1.plot_date(xr, (y61, y61), 'r--')
l50, = self.ax1.plot_date(xr, (y50, y50), 'r--')
l38, = self.ax1.plot_date(xr, (y38, y38), 'r--')
l23, = self.ax1.plot_date(xr, (y23, y23), 'r--')
l0, = self.ax1.plot_date(xr, (loy, loy), 'r-')
self.fibs = (l100, l61, l50, l38, l23, l0)
else:
l100, l61, l50, l38, l23, l0 = self.fibs
l100.set_data(xr, (hiy, hiy))
l61.set_data(xr, (y61, y61))
l50.set_data(xr, (y50, y50))
l38.set_data(xr, (y38, y38))
l23.set_data(xr, (y23, y23))
l0.set_data(xr, (loy, loy))
def mark_low_high(self, n):
x = self.xs
y = self.ys
rc = self.mmh((n, x[n], y[n]))
if rc:
lohi, tick = rc
nlh, xlh, ylh = tick
if lohi < 0:
# low
self.ax1.annotate('low',
xy=(x[nlh], y[nlh]),
xytext=(x[n], y[nlh]),
arrowprops=dict(facecolor='red',
frac=0.3,
shrink=0.1))
elif lohi > 0:
# high
self.ax1.annotate('high',
xy=(x[nlh], y[nlh]),
xytext=(x[n], y[nlh]),
arrowprops=dict(facecolor='green',
frac=0.3,
shrink=0.1))
def stop(self):
if self.after_id:
self.root.after_cancel(self.after_id)
self.after_id = None
def resume(self):
if self.after_id is None:
self.after_id = self.root.after(10, self.animate)
def times_one(self):
self.advance_count = 1
self.resume()
def times_five(self):
self.advance_count = 5
self.resume()
def times_ten(self):
self.advance_count = 10
self.resume()
def run(self):
self.root.after(500, self.animate)
self.root.mainloop()
self.root.destroy()
if __name__ == "__main__":
app = Main()
app.run()

View File

@@ -1,594 +0,0 @@
# Copyright (c) 2009 Andreas Balogh
# See LICENSE for details.
"""
Online sliding window with trend analysis
1. segment tick data with a sliding window alogrithm
2. recognise low/high points by comparing slope information
3. recognise trend by observing low/high point difference
"""
# system imports
import datetime
import os
import re
import logging
import warnings
import math
import Tkinter as Tk
import numpy as np
import matplotlib as mpl
mpl.use('TkAgg')
from matplotlib.backends.backend_tkagg import FigureCanvasTkAgg
import matplotlib.pyplot as plt
import matplotlib.dates as mdates
from matplotlib.dates import date2num
# local imports
from globals import *
# constants
ONE_MINUTE = 60. / 86400.
LOW, NONE, HIGH = range(-1, 2)
# globals
LOG = logging.getLogger()
logging.basicConfig(level=logging.DEBUG,
format='%(asctime)s.%(msecs)03i %(levelname).4s %(process)d:%(thread)d %(message)s',
datefmt='%H:%M:%S')
MDF_REO = re.compile("(..):(..):(..)\.*(\d+)*")
def tdl(tick_date):
""" returns a list of tick tuples (cdt, last) for specified day """
fiid = "846900"
year = tick_date.strftime("%Y")
yyyymmdd = tick_date.strftime("%Y%m%d")
filename = "%s.csv" % (fiid)
filepath = os.path.join(RTTRD_VAR, "consors-mdf\\data", year, yyyymmdd, filename)
x = [ ]
y = [ ]
v = [ ]
fh = open(filepath, "r")
try:
prev_last = ""
for line in fh:
flds = line.split(",")
# determine file version
if flds[2] == "LAST":
last = float(flds[3])
vol = float(flds[4])
else:
last = float(flds[4])
vol = 0.0
# skip ticks with same last price
if prev_last == last:
continue
else:
prev_last = last
# parse time
mobj = MDF_REO.match(flds[0])
if mobj is None:
raise ValueError("no match for [%s]" % (flds[0],))
(hh, mm, ss, ms) = mobj.groups()
if ms:
c_time = datetime.time(int(hh), int(mm), int(ss), int(ms) * 1000)
else:
c_time = datetime.time(int(hh), int(mm), int(ss))
cdt = datetime.datetime.combine(tick_date, c_time)
x.append(date2num(cdt))
y.append(last)
v.append(vol)
finally:
fh.close()
# throw away first line of file (close price from previous day)
del x[0]
del y[0]
del v[0]
return (x, y, v)
def interpolate_line(xs, ys):
"""Fit a straight line y = bx + a to a set of points (x, y) """
# from two data points only!
x1, x2 = xs
y1, y2 = ys
try:
b = ( y2 - y1 ) / ( x2 - x1 )
except ZeroDivisionError:
print "interpolate_line: division by zero, ", x1, x2, y1, y2
b = 0.0
a = y1 - b * x1
return (b, a)
def num2sod(x):
frac, integ = math.modf(x)
return frac * 86400
class TimedLohi:
"""Time series online low and high detector.
Confirms low/high candidates after timeout.
Time dependent.
"""
def __init__(self, bias, timeout = ONE_MINUTE):
assert(bias > 0)
self.bias = bias
self.timeout = timeout
self.low0 = None
self.high0 = None
self.prev_lohi = NONE
self.lohis = [ ]
self.lows = [ ]
self.highs = [ ]
def __call__(self, tick):
"""Add extended tick to the max min parser.
@param tick: The value of the current tick.
@type tick: tuple(cdt, last)
@return: 1. Tick if new max min has been detected,
2. None otherwise.
"""
n, cdt, last = tick
res = None
# automatic initialisation
if self.low0 is None:
self.low0 = tick
self.lows.append((n, cdt, last - 1))
if self.high0 is None:
self.high0 = tick
self.highs.append((n, cdt, last + 1))
if last > self.high0[2]:
self.high0 = tick
if self.prev_lohi == NONE:
if self.high0[2] > self.low0[2] + self.bias:
res = self.high0
self.low0 = self.high0
self.lows.append(self.high0)
self.lohis.append(self.high0)
self.prev_lohi = HIGH
if last < self.low0[2]:
self.low0 = tick
if self.prev_lohi == NONE:
if self.low0[2] < self.high0[2] - self.bias:
res = self.low0
self.high0 = self.low0
self.lows.append(self.low0)
self.lohis.append(self.low0)
self.prev_lohi = LOW
if self.high0[1] < cdt - self.timeout and \
((self.prev_lohi == LOW and \
self.high0[2] > self.lows[-1][2] + self.bias) or
(self.prev_lohi == HIGH and \
self.high0[2] > self.highs[-1][2])):
res = self.high0
self.low0 = self.high0
self.highs.append(self.high0)
self.lohis.append(self.high0)
self.prev_lohi = HIGH
if self.low0[1] < cdt - self.timeout and \
((self.prev_lohi == LOW and \
self.low0[2] < self.lows[-1][2]) or
(self.prev_lohi == HIGH and \
self.low0[2] < self.highs[-1][2] - self.bias)):
res = self.low0
self.high0 = self.low0
self.lows.append(self.low0)
self.lohis.append(self.low0)
self.prev_lohi = LOW
if res:
return (self.prev_lohi, res)
else:
return None
def find_lows_highs(xs, ys):
dacp = DelayedAcp(10)
for tick in zip(range(len(xs)), xs, ys):
dacp(tick)
return dacp.lows, dacp.highs
class DelayedAcp:
"""Time series max & min detector."""
def __init__(self, bias):
assert(bias > 0)
self.bias = bias
self.trend = None
self.mm0 = None
self.lohis = [ ]
self.lows = [ ]
self.highs = [ ]
def __call__(self, tick):
"""Add extended tick to the max min parser.
@param tick: The value of the current tick.
@type tick: tuple(n, cdt, last)
@return: 1. Tick if new max min has been detected,
2. None otherwise.
"""
n, cdt, last = tick
res = None
# automatic initialisation
if self.mm0 is None:
# initialise water mark
self.mm0 = tick
res = self.mm0
self.lows = [(n, cdt, last - 1)]
self.highs = [(n, cdt, last + 1)]
else:
# initialise trend until price has changed
if self.trend is None or self.trend == 0:
self.trend = cmp(last, self.mm0[2])
# check for max
if self.trend > 0:
if last > self.mm0[2]:
self.mm0 = tick
if last < self.mm0[2] - self.bias:
self.lohis.append(self.mm0)
self.highs.append(self.mm0)
res = self.mm0
# revert trend & water mark
self.mm0 = tick
self.trend = -1
# check for min
if self.trend < 0:
if last < self.mm0[2]:
self.mm0 = tick
if last > self.mm0[2] + self.bias:
self.lohis.append(self.mm0)
self.lows.append(self.mm0)
res = self.mm0
# revert trend & water mark
self.mm0 = tick
self.trend = +1
return (cmp(self.trend, 0), res)
class SlidingWindow:
"""Douglas-Peucker algorithm."""
def __init__(self, bias):
assert(bias > 0)
self.bias = bias
self.xs = [ ]
self.ys = [ ]
self.segx = [ ]
self.segy = [ ]
self.types = [ ]
self.bs = [ ]
def __call__(self, tick):
"""Add extended tick to the max min parser.
@param tick: The value of the current tick.
@type tick: tuple(n, cdt, last)
@return: 1. Tick if new max min has been detected,
2. None otherwise.
"""
n, cdt, last = tick
max_distance = self.bias
rc = None
self.xs.append(cdt)
self.ys.append(last)
x0, y0 = (self.xs[0], self.ys[0])
x1, y1 = (self.xs[-1], self.ys[-1])
if n == 0:
self.segx.append(x0)
self.segy.append(y0)
if len(self.xs) < 2:
return None
# check distance
coefs = interpolate_line((x0, x1), (y0, y1))
ip_ys = np.polyval(coefs, self.xs)
d_ys = np.absolute(self.ys - ip_ys)
d_max = np.amax(d_ys)
if d_max > max_distance:
n = np.argmax(d_ys)
x2, y2 = (self.xs[n], self.ys[n])
self.segx.append(x2)
self.segy.append(y2)
segment_added = True
# store slope of segment
b0, a0 = interpolate_line((x0, x2), (y0, y2))
self.bs.append(b0)
# remove ticks of previous segment
del self.xs[0:n]
del self.ys[0:n]
# slope of current segment
x0, y0 = (self.xs[0], self.ys[0])
b1, a1 = interpolate_line((x0, x1), (y0, y1))
lohi = self.get_type(b0, b1)
rc = (x2, y2, lohi)
return (self.segx + [x1], self.segy + [y1], rc)
def get_type(self, b0, b1):
""" calculate gearing
y: previous slope, x: current slope
<0 ~0 >0
<0 L L L
~0 H 0 L
>0 H H H
"""
if b0 < -SMALL and b1 < -SMALL and b0 > b1:
lohi = "d+"
elif b0 < -SMALL and b1 < SMALL and b0 < b1:
lohi = "d-"
elif b0 < -SMALL and b1 > SMALL:
lohi = "L"
elif abs(b0) < SMALL and b1 < -SMALL:
lohi = "d+"
elif abs(b0) < SMALL and abs(b1) < SMALL:
lohi = "0"
elif abs(b0) < SMALL and b1 > SMALL:
lohi = "u+"
elif b0 > SMALL and b1 < -SMALL:
lohi = "H"
elif b0 > SMALL and b1 > -SMALL and b0 > b1:
lohi = "u-"
elif b0 > SMALL and b1 > SMALL and b0 < b1:
lohi = "u+"
else:
lohi = "?"
return lohi
SMALL = 1E-10
class Main:
def __init__(self):
warnings.simplefilter("default", np.RankWarning)
self.advance_count = 10
self.ylow = None
self.yhigh = None
self.segs = [ ]
self.root = Tk.Tk()
self.root.wm_title("Embedding in TK")
# create plot
fig = plt.figure()
self.ax1 = fig.add_subplot(211) # ticks
# self.ax2 = fig.add_subplot(312) # slope of line segement
self.ax3 = fig.add_subplot(212) # moving average (10min)
self.ax1.set_ylabel("ticks")
# self.ax2.set_ylabel("slope")
self.ax3.set_ylabel("gearing")
major_fmt = mdates.DateFormatter('%H:%M:%S')
self.ax1.xaxis.set_major_formatter(major_fmt)
self.ax1.xaxis.set_major_locator(mdates.MinuteLocator(byminute = range(0, 60, 10)))
self.ax1.xaxis.set_minor_locator(mdates.MinuteLocator())
self.ax1.format_xdata = major_fmt
self.ax1.format_ydata = lambda x: '%1.2f' % x
self.ax1.grid(True)
"""
self.ax2.xaxis.set_major_formatter(major_fmt)
self.ax2.xaxis.set_major_locator(mdates.MinuteLocator(byminute = range(0, 60, 10)))
self.ax2.xaxis.set_minor_locator(mdates.MinuteLocator())
self.ax2.format_xdata = major_fmt
self.ax2.format_ydata = lambda x: '%1.2f' % x
self.ax2.grid(True)
"""
self.ax3.xaxis.set_major_formatter(major_fmt)
self.ax3.xaxis.set_major_locator(mdates.MinuteLocator(byminute = range(0, 60, 10)))
self.ax3.xaxis.set_minor_locator(mdates.MinuteLocator())
self.ax3.format_xdata = major_fmt
self.ax3.format_ydata = lambda x: '%1.2f' % x
self.ax3.grid(True)
# rotates and right aligns the x labels, and moves the bottom of the
# axes up to make room for them
fig.autofmt_xdate()
# create artists
LOG.debug("Loading ticks...")
self.xs, self.ys, self.vs = tdl(datetime.datetime(2009, 7, 1))
LOG.debug("Ticks loaded.")
lows, highs = find_lows_highs(self.xs, self.ys)
self.mas = self.ys[:]
self.ss = [ 0 ] * len(self.xs)
self.gs = [ 0 ] * len(self.xs)
self.mmh = TimedLohi(5)
self.osw = SlidingWindow(2)
self.w0 = 0
self.wd = 2000
self.low_high_crs = 0
xr, yr, vr, sr, gr = self.tick_window(self.w0, self.wd)
self.n0 = 0
# top subplot
self.tl, = self.ax1.plot_date(xr, yr, '-')
self.seg, = self.ax1.plot_date((xr[0], xr[1]), (yr[0], yr[1]), 'k-')
# Acp markers
self.him, = self.ax1.plot_date([x for n, x, y in lows], [y for n, x, y in lows], 'go')
self.lom, = self.ax1.plot_date([x for n, x, y in highs], [y for n, x, y in highs], 'ro')
# volume subplot
# self.dl, = self.ax2.plot_date(xr, vr, '-')
self.dl, = self.ax1.plot_date(xr, vr, 'g-')
# slope subplot
# self.sl, = self.ax2.plot_date(xr, sr, '-')
# gearing subplot
self.gl, = self.ax3.plot_date(xr, gr, '-')
self.set_axis(xr, yr)
# embed canvas in Tk
self.canvas = FigureCanvasTkAgg(fig, master=self.root)
self.canvas.draw()
self.canvas.get_tk_widget().pack(side=Tk.TOP, fill=Tk.BOTH, expand=Tk.TRUE)
# toolbar = NavigationToolbar2TkAgg( self.canvas, self.root )
# toolbar.update()
# self.canvas._tkself.canvas.pack(side=Tk.TOP, fill=Tk.BOTH, expand=1)
fr1 = Tk.Frame(master=self.root)
bu1 = Tk.Button(master=fr1, text='Quit', command=self.root.quit)
bu2 = Tk.Button(master=fr1, text='Stop', command=self.stop)
bu4 = Tk.Button(master=fr1, text='1x', command=self.times_one)
bu5 = Tk.Button(master=fr1, text='5x', command=self.times_five)
bu6 = Tk.Button(master=fr1, text='10x', command=self.times_ten)
bu1.pack(side=Tk.RIGHT, padx=5, pady=5)
bu6.pack(side=Tk.RIGHT, padx=5, pady=5)
bu5.pack(side=Tk.RIGHT, padx=5, pady=5)
bu4.pack(side=Tk.RIGHT, padx=5, pady=5)
bu2.pack(side=Tk.RIGHT, padx=5, pady=5)
fr1.pack(side=Tk.BOTTOM)
def animate(self):
self.w0 += self.advance_count
# prepare timeline window
xr, yr, vr, sr, gr = self.tick_window(self.w0, self.wd)
while self.low_high_crs < self.w0 + self.wd:
# self.mark_low_high(self.low_high_crs)
self.mark_segments(self.low_high_crs)
self.ma(self.low_high_crs, 10)
self.low_high_crs += 1
# update tick line
self.tl.set_data(xr, yr)
# update segment slope
# self.sl.set_data(xr, sr)
# update volume line
self.dl.set_data(xr, vr)
# gearing line
self.gl.set_data(xr, gr)
# update axis
self.set_axis(xr, yr)
self.canvas.draw()
if self.w0 < len(self.xs) - self.wd - 1:
self.after_id = self.root.after(10, self.animate)
def set_axis(self, xr, yr, bias=50):
if self.ylow is None:
self.ylow = yr[0] - bias / 2
self.yhigh = yr[0] + bias / 2
for y in yr:
if y < self.ylow:
self.ylow = y
self.yhigh = self.ylow + bias
if y > self.yhigh:
self.yhigh = y
self.ylow = self.yhigh - bias
self.ax1.axis([xr[0], xr[-1], self.ylow, self.yhigh])
# self.ax2.axis([xr[0], xr[-1], -5, +5])
self.ax3.axis([xr[0], xr[-1], -50, +50])
def tick_window(self, w0, wd = 1000):
return (self.xs[w0:w0 + wd],
self.ys[w0:w0 + wd],
self.mas[w0:w0 + wd],
self.ss[w0:w0 + wd],
self.gs[w0:w0+wd])
def ma(self, n0, min):
self.mas[n0] = np.average(self.ys[n0-min*60:n0])
self.gs[n0] = self.ys[n0] - self.mas[n0] + self.ss[n0]
def mark_segments(self, n):
x = self.xs
y = self.ys
rc = self.osw((n, x[n], y[n]))
if rc is not None:
segx, segy, lohi = rc
self.seg.set_data(segx, segy)
if lohi is not None:
text = lohi[2]
if text == "u+":
fc = "blue"
dy = -15
elif text == "d+":
fc = "blue"
dy = +15
elif text == "H":
fc = "green"
dy = +15
elif text == "L":
fc = "red"
dy = -15
else:
fc = None
if fc:
self.ax1.annotate(text,
xy=(lohi[0], lohi[1]),
xytext=(segx[-1], segy[-2]+dy),
arrowprops=dict(facecolor=fc,
frac=0.3,
shrink=0.1))
def mark_low_high(self, n):
x = self.xs
y = self.ys
rc = self.mmh((n, x[n], y[n]))
if rc:
lohi, tick = rc
nlh, xlh, ylh = tick
if lohi < 0:
# low
self.ax1.annotate('low',
xy=(x[nlh], y[nlh]),
xytext=(x[n], y[nlh]),
arrowprops=dict(facecolor='red',
frac=0.3,
shrink=0.1))
elif lohi > 0:
# high
self.ax1.annotate('high',
xy=(x[nlh], y[nlh]),
xytext=(x[n], y[nlh]),
arrowprops=dict(facecolor='green',
frac=0.3,
shrink=0.1))
def stop(self):
if self.after_id:
self.root.after_cancel(self.after_id)
self.after_id = None
def resume(self):
if self.after_id is None:
self.after_id = self.root.after(10, self.animate)
def times_one(self):
self.advance_count = 1
self.resume()
def times_five(self):
self.advance_count = 5
self.resume()
def times_ten(self):
self.advance_count = 10
self.resume()
def run(self):
self.root.after(500, self.animate)
self.root.mainloop()
self.root.destroy()
if __name__ == "__main__":
app = Main()
app.run()

View File

@@ -1,604 +0,0 @@
# Copyright (c) 2009 Andreas Balogh
# See LICENSE for details.
"""
Online sliding window with trend analysis
1. segment tick data with a sliding window alogrithm
2. recognise low/high points by comparing slope information
3. recognise trend by observing low/high point difference
"""
# system imports
import datetime
import os
import re
import logging
import warnings
import math
import Tkinter as Tk
import numpy as np
import matplotlib as mpl
mpl.use('TkAgg')
from matplotlib.backends.backend_tkagg import FigureCanvasTkAgg
import matplotlib.pyplot as plt
import matplotlib.dates as mdates
from matplotlib.dates import date2num
# local imports
from namedtuple import NamedTuple
from globals import *
# constants
ONE_MINUTE = 60. / 86400.
LOW, NONE, HIGH = range(-1, 2)
Trend = NamedTuple('Trend', 'n x y')
# globals
LOG = logging.getLogger()
logging.basicConfig(level=logging.DEBUG,
format='%(asctime)s.%(msecs)03i %(levelname).4s %(process)d:%(thread)d %(message)s',
datefmt='%H:%M:%S')
MDF_REO = re.compile("(..):(..):(..)\.*(\d+)*")
def tdl(tick_date):
""" returns a list of tick tuples (cdt, last) for specified day """
fiid = "846900"
year = tick_date.strftime("%Y")
yyyymmdd = tick_date.strftime("%Y%m%d")
filename = "%s.csv" % (fiid)
filepath = os.path.join(RTTRD_VAR, "consors-mdf\\data", year, yyyymmdd, filename)
x = [ ]
y = [ ]
v = [ ]
fh = open(filepath, "r")
try:
prev_last = ""
for line in fh:
flds = line.split(",")
# determine file version
if flds[2] == "LAST":
last = float(flds[3])
vol = float(flds[4])
else:
last = float(flds[4])
vol = 0.0
# skip ticks with same last price
if prev_last == last:
continue
else:
prev_last = last
# parse time
mobj = MDF_REO.match(flds[0])
if mobj is None:
raise ValueError("no match for [%s]" % (flds[0],))
(hh, mm, ss, ms) = mobj.groups()
if ms:
c_time = datetime.time(int(hh), int(mm), int(ss), int(ms) * 1000)
else:
c_time = datetime.time(int(hh), int(mm), int(ss))
cdt = datetime.datetime.combine(tick_date, c_time)
x.append(date2num(cdt))
y.append(last)
v.append(vol)
finally:
fh.close()
# throw away first line of file (close price from previous day)
del x[0]
del y[0]
del v[0]
return (x, y, v)
def interpolate_line(xs, ys):
"""Fit a straight line y = bx + a to a set of points (x, y) """
# from two data points only!
x1, x2 = xs
y1, y2 = ys
try:
b = ( y2 - y1 ) / ( x2 - x1 )
except ZeroDivisionError:
print "interpolate_line: division by zero, ", x1, x2, y1, y2
b = 0.0
a = y1 - b * x1
return (b, a)
def num2sod(x):
frac, integ = math.modf(x)
return frac * 86400
class Bunch:
def __init__(self, **kwds):
self.__dict__.update(kwds)
class TimedLohi:
"""Time series online low and high detector.
Confirms low/high candidates after timeout.
Time dependent.
"""
def __init__(self, bias, timeout = ONE_MINUTE):
assert(bias > 0)
self.bias = bias
self.timeout = timeout
self.low0 = None
self.high0 = None
self.prev_lohi = NONE
self.lohis = [ ]
self.lows = [ ]
self.highs = [ ]
def __call__(self, tick):
"""Add extended tick to the max min parser.
@param tick: The value of the current tick.
@type tick: tuple(cdt, last)
@return: 1. Tick if new max min has been detected,
2. None otherwise.
"""
n, cdt, last = tick
res = None
# automatic initialisation
if self.low0 is None:
self.low0 = tick
self.lows.append((n, cdt, last - 1))
if self.high0 is None:
self.high0 = tick
self.highs.append((n, cdt, last + 1))
if last > self.high0[2]:
self.high0 = tick
if self.prev_lohi == NONE:
if self.high0[2] > self.low0[2] + self.bias:
res = self.high0
self.low0 = self.high0
self.lows.append(self.high0)
self.lohis.append(self.high0)
self.prev_lohi = HIGH
if last < self.low0[2]:
self.low0 = tick
if self.prev_lohi == NONE:
if self.low0[2] < self.high0[2] - self.bias:
res = self.low0
self.high0 = self.low0
self.lows.append(self.low0)
self.lohis.append(self.low0)
self.prev_lohi = LOW
if self.high0[1] < cdt - self.timeout and \
((self.prev_lohi == LOW and \
self.high0[2] > self.lows[-1][2] + self.bias) or
(self.prev_lohi == HIGH and \
self.high0[2] > self.highs[-1][2])):
res = self.high0
self.low0 = self.high0
self.highs.append(self.high0)
self.lohis.append(self.high0)
self.prev_lohi = HIGH
if self.low0[1] < cdt - self.timeout and \
((self.prev_lohi == LOW and \
self.low0[2] < self.lows[-1][2]) or
(self.prev_lohi == HIGH and \
self.low0[2] < self.highs[-1][2] - self.bias)):
res = self.low0
self.high0 = self.low0
self.lows.append(self.low0)
self.lohis.append(self.low0)
self.prev_lohi = LOW
if res:
return (self.prev_lohi, res)
else:
return None
def find_lows_highs(xs, ys):
dacp = DelayedAcp(10)
for tick in zip(range(len(xs)), xs, ys):
dacp(tick)
return dacp.lows, dacp.highs
class DelayedAcp:
"""Time series max & min detector."""
def __init__(self, bias):
assert(bias > 0)
self.bias = bias
self.trend = None
self.mm0 = None
self.lohis = [ ]
self.lows = [ ]
self.highs = [ ]
def __call__(self, tick):
"""Add extended tick to the max min parser.
@param tick: The value of the current tick.
@type tick: tuple(n, cdt, last)
@return: 1. Tick if new max min has been detected,
2. None otherwise.
"""
n, cdt, last = tick
res = None
# automatic initialisation
if self.mm0 is None:
# initialise water mark
self.mm0 = tick
res = self.mm0
self.lows = [(n, cdt, last - 1)]
self.highs = [(n, cdt, last + 1)]
else:
# initialise trend until price has changed
if self.trend is None or self.trend == 0:
self.trend = cmp(last, self.mm0[2])
# check for max
if self.trend > 0:
if last > self.mm0[2]:
self.mm0 = tick
if last < self.mm0[2] - self.bias:
self.lohis.append(self.mm0)
self.highs.append(self.mm0)
res = self.mm0
# revert trend & water mark
self.mm0 = tick
self.trend = -1
# check for min
if self.trend < 0:
if last < self.mm0[2]:
self.mm0 = tick
if last > self.mm0[2] + self.bias:
self.lohis.append(self.mm0)
self.lows.append(self.mm0)
res = self.mm0
# revert trend & water mark
self.mm0 = tick
self.trend = +1
return (cmp(self.trend, 0), res)
class SlidingWindow:
"""Douglas-Peucker algorithm."""
def __init__(self, bias):
assert(bias > 0)
self.bias = bias
self.xs = [ ]
self.ys = [ ]
self.segx = [ ]
self.segy = [ ]
self.types = [ ]
self.bs = [ ]
def __call__(self, tick):
"""Add extended tick to the max min parser.
@param tick: The value of the current tick.
@type tick: tuple(n, cdt, last)
@return: 1. Tick if new max min has been detected,
2. None otherwise.
"""
n, cdt, last = tick
max_distance = self.bias
rc = None
self.xs.append(cdt)
self.ys.append(last)
x0, y0 = (self.xs[0], self.ys[0])
x1, y1 = (self.xs[-1], self.ys[-1])
if n == 0:
self.segx.append(x0)
self.segy.append(y0)
if len(self.xs) < 2:
return None
# check distance
coefs = interpolate_line((x0, x1), (y0, y1))
ip_ys = np.polyval(coefs, self.xs)
d_ys = np.absolute(self.ys - ip_ys)
d_max = np.amax(d_ys)
if d_max > max_distance:
n = np.argmax(d_ys)
x2, y2 = (self.xs[n], self.ys[n])
self.segx.append(x2)
self.segy.append(y2)
segment_added = True
# store slope of segment
b0, a0 = interpolate_line((x0, x2), (y0, y2))
self.bs.append(b0)
# remove ticks of previous segment
del self.xs[0:n]
del self.ys[0:n]
# slope of current segment
x0, y0 = (self.xs[0], self.ys[0])
b1, a1 = interpolate_line((x0, x1), (y0, y1))
lohi = self.get_type(b0, b1)
rc = (x2, y2, lohi)
return (self.segx + [x1], self.segy + [y1], rc)
def get_type(self, b0, b1):
""" calculate gearing
y: previous slope, x: current slope
<0 ~0 >0
<0 L L L
~0 H 0 L
>0 H H H
"""
if b0 < -SMALL and b1 < -SMALL and b0 > b1:
lohi = "d+"
elif b0 < -SMALL and b1 < SMALL and b0 < b1:
lohi = "d-"
elif b0 < -SMALL and b1 > SMALL:
lohi = "L"
elif abs(b0) < SMALL and b1 < -SMALL:
lohi = "d+"
elif abs(b0) < SMALL and abs(b1) < SMALL:
lohi = "0"
elif abs(b0) < SMALL and b1 > SMALL:
lohi = "u+"
elif b0 > SMALL and b1 < -SMALL:
lohi = "H"
elif b0 > SMALL and b1 > -SMALL and b0 > b1:
lohi = "u-"
elif b0 > SMALL and b1 > SMALL and b0 < b1:
lohi = "u+"
else:
lohi = "?"
return lohi
SMALL = 1E-10
class Main:
def __init__(self):
warnings.simplefilter("default", np.RankWarning)
self.advance_count = 10
self.ylow = None
self.yhigh = None
self.trend_starts = None
self.segs = [ ]
self.root = Tk.Tk()
self.root.wm_title("Embedding in TK")
# create plot
fig = plt.figure()
self.ax1 = fig.add_subplot(211) # ticks
self.ax2 = fig.add_subplot(212) # moving average (10min)
self.ax1.set_ylabel("ticks")
self.ax2.set_ylabel("gearing")
major_fmt = mdates.DateFormatter('%H:%M:%S')
self.ax1.xaxis.set_major_formatter(major_fmt)
self.ax1.xaxis.set_major_locator(mdates.MinuteLocator(byminute = range(0, 60, 10)))
self.ax1.xaxis.set_minor_locator(mdates.MinuteLocator())
self.ax1.format_xdata = major_fmt
self.ax1.format_ydata = lambda x: '%1.2f' % x
self.ax1.grid(True)
self.ax2.xaxis.set_major_formatter(major_fmt)
self.ax2.xaxis.set_major_locator(mdates.MinuteLocator(byminute = range(0, 60, 10)))
self.ax2.xaxis.set_minor_locator(mdates.MinuteLocator())
self.ax2.format_xdata = major_fmt
self.ax2.format_ydata = lambda x: '%1.2f' % x
self.ax2.grid(True)
# rotates and right aligns the x labels, and moves the bottom of the
# axes up to make room for them
fig.autofmt_xdate()
# create artists
LOG.debug("Loading ticks...")
self.xs, self.ys, self.vs = tdl(datetime.datetime(2009, 7, 1))
LOG.debug("Ticks loaded.")
lows, highs = find_lows_highs(self.xs, self.ys)
self.mas = self.ys[:]
self.mmh = TimedLohi(5)
self.osw = SlidingWindow(2)
self.w0 = 0
self.wd = 2000
self.w_crs = 0
xr, yr, mar = self.tick_window(self.w0, self.wd)
self.gr = [0.0] * self.wd
# add artists to top subplot
# tick line and segments
self.tl, = self.ax1.plot_date(xr, yr, '-')
self.seg, = self.ax1.plot_date((xr[0], xr[1]), (yr[0], yr[1]), 'k-')
# Acp markers
self.him, = self.ax1.plot_date([x for n, x, y in lows], [y for n, x, y in lows], 'go')
self.lom, = self.ax1.plot_date([x for n, x, y in highs], [y for n, x, y in highs], 'ro')
# trend lines
self.trd, = self.ax1.plot_date(xr[0:1], yr[0:1], 'k--')
self.trh, = self.ax1.plot_date(xr[0:1], yr[0:1], 'k-')
self.trl, = self.ax1.plot_date(xr[0:1], yr[0:1], 'k-')
# add artists to bottom subplot
self.gl, = self.ax2.plot_date(xr, self.gr, '-')
self.set_axis(xr, yr)
# embed canvas in Tk
self.canvas = FigureCanvasTkAgg(fig, master=self.root)
self.canvas.draw()
self.canvas.get_tk_widget().pack(side=Tk.TOP, fill=Tk.BOTH, expand=Tk.TRUE)
# toolbar = NavigationToolbar2TkAgg( self.canvas, self.root )
# toolbar.update()
# self.canvas._tkself.canvas.pack(side=Tk.TOP, fill=Tk.BOTH, expand=1)
fr1 = Tk.Frame(master=self.root)
bu1 = Tk.Button(master=fr1, text='Quit', command=self.root.quit)
bu2 = Tk.Button(master=fr1, text='Stop', command=self.stop)
bu4 = Tk.Button(master=fr1, text='1x', command=self.times_one)
bu5 = Tk.Button(master=fr1, text='5x', command=self.times_five)
bu6 = Tk.Button(master=fr1, text='10x', command=self.times_ten)
bu1.pack(side=Tk.RIGHT, padx=5, pady=5)
bu6.pack(side=Tk.RIGHT, padx=5, pady=5)
bu5.pack(side=Tk.RIGHT, padx=5, pady=5)
bu4.pack(side=Tk.RIGHT, padx=5, pady=5)
bu2.pack(side=Tk.RIGHT, padx=5, pady=5)
fr1.pack(side=Tk.BOTTOM)
def animate(self):
self.w0 += self.advance_count
# prepare timeline window
while self.w_crs < self.w0 + self.wd:
self.ma(self.w_crs, 10)
self.fitter(self.w_crs)
self.w_crs += 1
xr, yr, mar = self.tick_window(self.w0, self.wd)
# update tick line
self.tl.set_data(xr, yr)
# gearing line
# self.gl.set_data(xr, gr)
# update axis
self.set_axis(xr, yr)
self.canvas.draw()
if self.w0 < len(self.xs) - self.wd - 1:
self.after_id = self.root.after(10, self.animate)
def set_axis(self, xr, yr, bias=50):
if self.ylow is None:
self.ylow = yr[0] - bias / 2
self.yhigh = yr[0] + bias / 2
for y in yr:
if y < self.ylow:
self.ylow = y
self.yhigh = self.ylow + bias
if y > self.yhigh:
self.yhigh = y
self.ylow = self.yhigh - bias
self.ax1.axis([xr[0], xr[-1], self.ylow, self.yhigh])
self.ax2.axis([xr[0], xr[-1], -50, +50])
def tick_window(self, w0, wd = 1000):
return (self.xs[w0:w0 + wd],
self.ys[w0:w0 + wd],
self.mas[w0:w0 + wd],
)
def ma(self, n0, min):
self.mas[n0] = np.average(self.ys[n0-min*60:n0])
def fitter(self, n0):
# find last low/high within t-1
# linear regression from t-5 to t-1
# linear regression within t-1
# visual inspection
# determine run-on low and highs
if self.trend_starts is None:
self.trend_starts = [Trend(n=n0, x=self.xs[n0], y=self.ys[n0])]
trend_start = self.trend_starts[-1]
# wait for 30 secs to stabilise
if trend_start.n + 30 > n0:
return
# fit trend
xr = self.xs[trend_start.n:n0]
yr = self.ys[trend_start.n:n0]
ps = np.polyfit(xr, yr, 1)
trend_xs = [xr[0], xr[-1]]
trend_ys = np.polyval(ps, trend_xs)
self.trd.set_data(trend_xs, trend_ys)
# fit counter trend
def mark_segments(self, n):
x = self.xs
y = self.ys
rc = self.osw((n, x[n], y[n]))
if rc is not None:
segx, segy, lohi = rc
self.seg.set_data(segx, segy)
if lohi is not None:
text = lohi[2]
if text == "u+":
fc = "blue"
dy = -15
elif text == "d+":
fc = "blue"
dy = +15
elif text == "H":
fc = "green"
dy = +15
elif text == "L":
fc = "red"
dy = -15
else:
fc = None
if fc:
self.ax1.annotate(text,
xy=(lohi[0], lohi[1]),
xytext=(segx[-1], segy[-2]+dy),
arrowprops=dict(facecolor=fc,
frac=0.3,
shrink=0.1))
def mark_low_high(self, n):
x = self.xs
y = self.ys
rc = self.mmh((n, x[n], y[n]))
if rc:
lohi, tick = rc
nlh, xlh, ylh = tick
if lohi < 0:
# low
self.ax1.annotate('low',
xy=(x[nlh], y[nlh]),
xytext=(x[n], y[nlh]),
arrowprops=dict(facecolor='red',
frac=0.3,
shrink=0.1))
elif lohi > 0:
# high
self.ax1.annotate('high',
xy=(x[nlh], y[nlh]),
xytext=(x[n], y[nlh]),
arrowprops=dict(facecolor='green',
frac=0.3,
shrink=0.1))
def stop(self):
if self.after_id:
self.root.after_cancel(self.after_id)
self.after_id = None
def resume(self):
if self.after_id is None:
self.after_id = self.root.after(10, self.animate)
def times_one(self):
self.advance_count = 1
self.resume()
def times_five(self):
self.advance_count = 5
self.resume()
def times_ten(self):
self.advance_count = 10
self.resume()
def run(self):
self.root.after(500, self.animate)
self.root.mainloop()
self.root.destroy()
if __name__ == "__main__":
app = Main()
app.run()