From 6e86a18cfeb4d1fc7916bb7d5083d1fa7e14432c Mon Sep 17 00:00:00 2001 From: Andreas Date: Sat, 24 Oct 2009 15:01:03 +0000 Subject: [PATCH] reorganising projects --HG-- branch : sandbox --- mpl/fibionacci.py | 449 -------------------------------- mpl/fibionacci2.py | 422 ------------------------------- mpl/fibionacci3.py | 420 ------------------------------ mpl/fibionacci4.py | 480 ----------------------------------- mpl/globals.py | 18 -- mpl/mpl-blit.py | 174 ------------- mpl/mpl-draw.py | 163 ------------ mpl/mpl-embedded.py | 330 ------------------------ mpl/pad1.py | 499 ------------------------------------ mpl/sw-trend1.py | 594 ------------------------------------------- mpl/sw-trend2.py | 604 -------------------------------------------- 11 files changed, 4153 deletions(-) delete mode 100644 mpl/fibionacci.py delete mode 100644 mpl/fibionacci2.py delete mode 100644 mpl/fibionacci3.py delete mode 100644 mpl/fibionacci4.py delete mode 100644 mpl/globals.py delete mode 100644 mpl/mpl-blit.py delete mode 100644 mpl/mpl-draw.py delete mode 100644 mpl/mpl-embedded.py delete mode 100644 mpl/pad1.py delete mode 100644 mpl/sw-trend1.py delete mode 100644 mpl/sw-trend2.py diff --git a/mpl/fibionacci.py b/mpl/fibionacci.py deleted file mode 100644 index 0b19902..0000000 --- a/mpl/fibionacci.py +++ /dev/null @@ -1,449 +0,0 @@ -# Copyright (c) 2008 Andreas Balogh -# See LICENSE for details. - -""" animated drawing of ticks - - using self.canvas embedded in Tk application - - Fibionacci retracements of 61.8, 50.0, 38.2, 23.6 % of min and max -""" - -# system imports - -import datetime -import os -import re -import logging -import sys -import warnings - -import Tkinter as Tk -import numpy as np - -import matplotlib -matplotlib.use('TkAgg') - -from matplotlib.backends.backend_tkagg import FigureCanvasTkAgg, NavigationToolbar2TkAgg -import matplotlib.pyplot as plt -import matplotlib.dates as mdates -from matplotlib.dates import date2num - -# local imports - -# constants - -ONE_MINUTE = 60. / 86400. -LOW, NONE, HIGH = range(-1, 2) - -# globals - -LOG = logging.getLogger() - -logging.basicConfig(level=logging.DEBUG, - format='%(asctime)s.%(msecs)03i %(levelname).4s %(process)d:%(thread)d %(message)s', - datefmt='%H:%M:%S') - -MDF_REO = re.compile("(..):(..):(..)\.*(\d+)*") - - -def tdl(tick_date): - """ returns a list of tick tuples (cdt, last) for specified day """ - fiid = "846900" - year = tick_date.strftime("%Y") - yyyymmdd = tick_date.strftime("%Y%m%d") - filename = "%s.csv" % (fiid) - filepath = os.path.join("d:\\rttrd-prd-var\\consors-mdf\\data", year, yyyymmdd, filename) - x = [ ] - y = [ ] - fh = open(filepath, "r") - try: - prev_last = "" - for line in fh: - flds = line.split(",") - # determine file version - if flds[2] == "LAST": - last = float(flds[3]) - vol = float(flds[4]) - else: - last = float(flds[4]) - vol = 0.0 - # skip ticks with same last price - if prev_last == last: - continue - else: - prev_last = last - # parse time - mobj = MDF_REO.match(flds[0]) - if mobj is None: - raise ValueError("no match for [%s]" % (flds[0],)) - (hh, mm, ss, ms) = mobj.groups() - if ms: - c_time = datetime.time(int(hh), int(mm), int(ss), int(ms) * 1000) - else: - c_time = datetime.time(int(hh), int(mm), int(ss)) - cdt = datetime.datetime.combine(tick_date, c_time) - x.append(date2num(cdt)) - y.append(last) - finally: - fh.close() - # throw away first line of file (close price from previous day) - del x[0] - del y[0] - return (x, y) - - -class Lohi: - """Time series online low and high detector.""" - def __init__(self, bias): - assert(bias > 0) - self.bias = bias - self.low0 = None - self.high0 = None - self.prev_lohi = NONE - self.lohis = [ ] - self.lows = [ ] - self.highs = [ ] - - def __call__(self, tick): - """Add extended tick to the max min parser. - - @param tick: The value of the current tick. - @type tick: tuple(cdt, last) - - @return: 1. Tick if new max min has been detected, - 2. None otherwise. - """ - n, cdt, last = tick - res = None - # automatic initialisation - if self.low0 is None: - self.low0 = tick - self.lows.append((n, cdt, last - 1)) - if self.high0 is None: - self.high0 = tick - self.highs.append((n, cdt, last + 1)) - if last > self.high0[2]: - self.high0 = tick - if self.prev_lohi == NONE: - if self.high0[2] > self.low0[2] + self.bias: - res = self.high0 - self.low0 = self.high0 - self.lows.append(self.high0) - self.lohis.append(self.high0) - self.prev_lohi = HIGH - if last < self.low0[2]: - self.low0 = tick - if self.prev_lohi == NONE: - if self.low0[2] < self.high0[2] - self.bias: - res = self.low0 - self.high0 = self.low0 - self.lows.append(self.low0) - self.lohis.append(self.low0) - self.prev_lohi = LOW - if self.high0[1] < cdt - ONE_MINUTE and \ - ((self.prev_lohi == LOW and \ - self.high0[2] > self.lows[-1][2] + self.bias) or - (self.prev_lohi == HIGH and \ - self.high0[2] > self.highs[-1][2])): - res = self.high0 - self.low0 = self.high0 - self.highs.append(self.high0) - self.lohis.append(self.high0) - self.prev_lohi = HIGH - if self.low0[1] < cdt - ONE_MINUTE and \ - ((self.prev_lohi == LOW and \ - self.low0[2] < self.lows[-1][2]) or - (self.prev_lohi == HIGH and \ - self.low0[2] < self.highs[-1][2] - self.bias)): - res = self.low0 - self.high0 = self.low0 - self.lows.append(self.low0) - self.lohis.append(self.low0) - self.prev_lohi = LOW - if res: - return (self.prev_lohi, res) - else: - return None - - -class Acp: - """Always correct predictor""" - def __init__(self, lows, highs): - self.lows = lows - self.highs = highs - - def __call__(self, tick): - """Always correct predictor. - - Requires previous run of DelayedAcp to determine lows and highs. - - @param tick: The value of the current tick. - @type tick: tuple(n, cdt, last) - - @return: 1. Tick if new max min has been detected, - 2. None otherwise. - """ - n, cdt, last = tick - res = None - return res - - -def find_lows_highs(xs, ys): - dacp = DelayedAcp(10) - for tick in zip(range(len(xs)), xs, ys): - dacp(tick) - return dacp.lows, dacp.highs - - -class DelayedAcp: - """Time series max & min detector.""" - def __init__(self, bias): - assert(bias > 0) - self.bias = bias - self.trend = None - self.mm0 = None - self.lohis = [ ] - self.lows = [ ] - self.highs = [ ] - - def __call__(self, tick): - """Add extended tick to the max min parser. - - @param tick: The value of the current tick. - @type tick: tuple(n, cdt, last) - - @return: 1. Tick if new max min has been detected, - 2. None otherwise. - """ - n, cdt, last = tick - res = None - # automatic initialisation - if self.mm0 is None: - # initalise water mark - self.mm0 = tick - res = self.mm0 - self.lows = [(n, cdt, last - 1)] - self.highs = [(n, cdt, last + 1)] - else: - # initalise trend until price has changed - if self.trend is None or self.trend == 0: - self.trend = cmp(last, self.mm0[2]) - # check for max - if self.trend > 0: - if last > self.mm0[2]: - self.mm0 = tick - if last < self.mm0[2] - self.bias: - self.lohis.append(self.mm0) - self.highs.append(self.mm0) - res = self.mm0 - # revert trend & water mark - self.mm0 = tick - self.trend = -1 - # check for min - if self.trend < 0: - if last < self.mm0[2]: - self.mm0 = tick - if last > self.mm0[2] + self.bias: - self.lohis.append(self.mm0) - self.lows.append(self.mm0) - res = self.mm0 - # revert trend & water mark - self.mm0 = tick - self.trend = +1 - return (cmp(self.trend, 0), res) - - -class Main: - def __init__(self): - warnings.simplefilter("default", np.RankWarning) - self.ylow = None - self.yhigh = None - self.advance_count = 1 - - self.root = Tk.Tk() - self.root.wm_title("Embedding in TK") - - # create plot - fig = plt.figure() - self.ax1 = fig.add_subplot(211) # ticks - self.ax2 = fig.add_subplot(212) # diff from polyfit - # ax3 = fig.add_subplot(313) # cash - - self.ax1.set_ylabel("ticks") - self.ax2.set_ylabel("polyfit diff") - # ax3.set_ylabel("cash") - - major_fmt = mdates.DateFormatter('%H:%M:%S') - self.ax1.xaxis.set_major_formatter(major_fmt) - self.ax1.format_xdata = major_fmt - self.ax1.format_ydata = lambda x: '%1.2f' % x - self.ax1.grid(True) - - self.ax2.xaxis.set_major_formatter(major_fmt) - self.ax2.format_xdata = major_fmt - self.ax2.format_ydata = lambda x: '%1.2f' % x - self.ax2.grid(True) - - # rotates and right aligns the x labels, and moves the bottom of the - # axes up to make room for them - fig.autofmt_xdate() - - # create artists - LOG.debug("Loading ticks...") - self.x, self.y = tdl(datetime.datetime(2009, 6, 3)) - LOG.debug("Ticks loaded.") - lows, highs = find_lows_highs(self.x, self.y) - - self.mmh = Lohi(10) - - self.w0 = 0 - self.wd = 1000 - self.low_high_crs = 0 - xr, yr = self.tick_window(self.w0, self.wd) - fit = np.average(yr) - - self.tl, = self.ax1.plot_date(xr, yr, '-') - self.fl, = self.ax1.plot_date(xr, (fit,) * len(xr), 'k--') - self.mh, = self.ax1.plot_date(xr, (yr[0],) * len(xr), 'g:') - self.ml, = self.ax1.plot_date(xr, (yr[0],) * len(xr), 'r:') - self.him, = self.ax1.plot_date([x for n, x, y in lows], [y for n, x, y in lows], 'go') - self.lom, = self.ax1.plot_date([x for n, x, y in highs], [y for n, x, y in highs], 'ro') - - self.dl, = self.ax2.plot_date(xr, (0,) * len(xr), '-') - - self.set_axis(xr, yr) - - # embed canvas in Tk - self.canvas = FigureCanvasTkAgg(fig, master=self.root) - self.canvas.draw() - self.canvas.get_tk_widget().pack(side=Tk.TOP, fill=Tk.BOTH, expand=Tk.TRUE) - - # toolbar = NavigationToolbar2TkAgg( self.canvas, self.root ) - # toolbar.update() - # self.canvas._tkself.canvas.pack(side=Tk.TOP, fill=Tk.BOTH, expand=1) - - fr1 = Tk.Frame(master=self.root) - bu1 = Tk.Button(master=fr1, text='Quit', command=self.root.quit) - bu2 = Tk.Button(master=fr1, text='Stop', command=self.stop) - bu3 = Tk.Button(master=fr1, text='Resume', command=self.resume) - bu4 = Tk.Button(master=fr1, text='1x', command=self.times_one) - bu5 = Tk.Button(master=fr1, text='5x', command=self.times_five) - bu6 = Tk.Button(master=fr1, text='10x', command=self.times_ten) - bu1.pack(side=Tk.RIGHT, padx=5, pady=5) - bu6.pack(side=Tk.RIGHT, padx=5, pady=5) - bu5.pack(side=Tk.RIGHT, padx=5, pady=5) - bu4.pack(side=Tk.RIGHT, padx=5, pady=5) - bu2.pack(side=Tk.RIGHT, padx=5, pady=5) - bu3.pack(side=Tk.RIGHT, padx=5, pady=5) - fr1.pack(side=Tk.BOTTOM) - - - def animate(self): - self.w0 += self.advance_count - # prepare timeline window - xr, yr = self.tick_window(self.w0, self.wd) - while self.low_high_crs < self.w0 + self.wd: - self.mark_low_high(self.low_high_crs) - self.low_high_crs += 1 - # build polynomial fit - mms = self.mmh.lohis - if len(mms) > 1: - mx = [ (x - int(x)) * 86400 for x in xr ] - my = yr - polyval = np.polyfit(mx, my, 10) - fit = np.polyval(polyval, mx) - self.fl.set_data(xr, fit) - # calc diff - self.dl.set_data(xr, yr - fit) - maxs = self.mmh.highs - if len(maxs) > 1: - n, x1, y1 = maxs[-2] - n, x2, y2 = maxs[-1] - x3 = xr[-1] - polyfit = np.polyfit((x1, x2), (y1, y2), 1) - y3 = np.polyval(polyfit, x3) - self.mh.set_data((x1, x2, x3), (y1, y2, y3)) - mins = self.mmh.lows - if len(mins) > 1: - n, x1, y1 = mins[-2] - n, x2, y2 = mins[-1] - x3 = xr[-1] - polyfit = np.polyfit((x1, x2), (y1, y2), 1) - y3 = np.polyval(polyfit, x3) - self.ml.set_data((x1, x2, x3), (y1, y2, y3)) - # update tick line - self.tl.set_data(xr, yr) - # update axis - self.set_axis(xr, yr) - self.canvas.draw() - if self.w0 < len(self.x) - self.wd - 1: - self.after_id = self.root.after(10, self.animate) - - def set_axis(self, xr, yr, bias=50): - if self.ylow is None: - self.ylow = yr[0] - bias / 2 - self.yhigh = yr[0] + bias / 2 - for y in yr: - if y < self.ylow: - self.ylow = y - self.yhigh = self.ylow + bias - if y > self.yhigh: - self.yhigh = y - self.ylow = self.yhigh - bias - self.ax1.axis([xr[0], xr[-1], self.ylow, self.yhigh]) - self.ax2.axis([xr[0], xr[-1], -25, +25]) - - def tick_window(self, w0, wd=1000): - return (self.x[w0:w0 + wd], self.y[w0:w0 + wd]) - - def mark_low_high(self, n): - x = self.x - y = self.y - rc = self.mmh((n, x[n], y[n])) - if rc: - lohi, tick = rc - nlh, xlh, ylh = tick - if lohi < 0: - # low - self.ax1.annotate('low', - xy=(x[nlh], y[nlh]), - xytext=(x[n], y[nlh]), - arrowprops=dict(facecolor='red', - frac=0.3, - shrink=0.1)) - elif lohi > 0: - # high - self.ax1.annotate('high', - xy=(x[nlh], y[nlh]), - xytext=(x[n], y[nlh]), - arrowprops=dict(facecolor='green', - frac=0.3, - shrink=0.1)) - - def stop(self): - if self.after_id: - self.root.after_cancel(self.after_id) - self.after_id = None - - def resume(self): - if self.after_id is None: - self.after_id = self.root.after(10, self.animate) - - def times_one(self): - self.advance_count = 1 - - def times_five(self): - self.advance_count = 5 - - def times_ten(self): - self.advance_count = 10 - - def run(self): - self.root.after(500, self.animate) - self.root.mainloop() - self.root.destroy() - - -if __name__ == "__main__": - app = Main() - app.run() diff --git a/mpl/fibionacci2.py b/mpl/fibionacci2.py deleted file mode 100644 index ec7896d..0000000 --- a/mpl/fibionacci2.py +++ /dev/null @@ -1,422 +0,0 @@ -# Copyright (c) 2008 Andreas Balogh -# See LICENSE for details. - -""" animated drawing of ticks - - using self.canvas embedded in Tk application - - Fibionacci retracements of 61.8, 50.0, 38.2, 23.6 % of min and max -""" - -# system imports - -import datetime -import os -import re -import logging -import warnings -import math - -import Tkinter as Tk -import numpy as np - -import matplotlib -matplotlib.use('TkAgg') - -from matplotlib.backends.backend_tkagg import FigureCanvasTkAgg -import matplotlib.pyplot as plt -import matplotlib.dates as mdates -from matplotlib.dates import date2num - -# local imports - -# constants - -ONE_MINUTE = 60. / 86400. -LOW, NONE, HIGH = range(-1, 2) - -# globals - -LOG = logging.getLogger() - -logging.basicConfig(level=logging.DEBUG, - format='%(asctime)s.%(msecs)03i %(levelname).4s %(process)d:%(thread)d %(message)s', - datefmt='%H:%M:%S') - -MDF_REO = re.compile("(..):(..):(..)\.*(\d+)*") - - -def tdl(tick_date): - """ returns a list of tick tuples (cdt, last) for specified day """ - fiid = "846900" - year = tick_date.strftime("%Y") - yyyymmdd = tick_date.strftime("%Y%m%d") - filename = "%s.csv" % (fiid) - filepath = os.path.join("c:\\rttrd-prd-var\\consors-mdf\\data", year, yyyymmdd, filename) - x = [ ] - y = [ ] - fh = open(filepath, "r") - try: - prev_last = "" - for line in fh: - flds = line.split(",") - # determine file version - if flds[2] == "LAST": - last = float(flds[3]) - vol = float(flds[4]) - else: - last = float(flds[4]) - vol = 0.0 - # skip ticks with same last price - if prev_last == last: - continue - else: - prev_last = last - # parse time - mobj = MDF_REO.match(flds[0]) - if mobj is None: - raise ValueError("no match for [%s]" % (flds[0],)) - (hh, mm, ss, ms) = mobj.groups() - if ms: - c_time = datetime.time(int(hh), int(mm), int(ss), int(ms) * 1000) - else: - c_time = datetime.time(int(hh), int(mm), int(ss)) - cdt = datetime.datetime.combine(tick_date, c_time) - x.append(date2num(cdt)) - y.append(last) - finally: - fh.close() - # throw away first line of file (close price from previous day) - del x[0] - del y[0] - return (x, y) - -def num2sod(x): - frac, integ = math.modf(x) - return frac * 86400 - -class Lohi: - """Time series online low and high detector.""" - def __init__(self, bias): - assert(bias > 0) - self.bias = bias - self.low0 = None - self.high0 = None - self.prev_lohi = NONE - self.lohis = [ ] - self.lows = [ ] - self.highs = [ ] - - def __call__(self, tick): - """Add extended tick to the max min parser. - - @param tick: The value of the current tick. - @type tick: tuple(cdt, last) - - @return: 1. Tick if new max min has been detected, - 2. None otherwise. - """ - n, cdt, last = tick - res = None - # automatic initialisation - if self.low0 is None: - self.low0 = tick - self.lows.append((n, cdt, last - 1)) - if self.high0 is None: - self.high0 = tick - self.highs.append((n, cdt, last + 1)) - if last > self.high0[2]: - self.high0 = tick - if self.prev_lohi == NONE: - if self.high0[2] > self.low0[2] + self.bias: - res = self.high0 - self.low0 = self.high0 - self.lows.append(self.high0) - self.lohis.append(self.high0) - self.prev_lohi = HIGH - if last < self.low0[2]: - self.low0 = tick - if self.prev_lohi == NONE: - if self.low0[2] < self.high0[2] - self.bias: - res = self.low0 - self.high0 = self.low0 - self.lows.append(self.low0) - self.lohis.append(self.low0) - self.prev_lohi = LOW - if self.high0[1] < cdt - ONE_MINUTE and \ - ((self.prev_lohi == LOW and \ - self.high0[2] > self.lows[-1][2] + self.bias) or - (self.prev_lohi == HIGH and \ - self.high0[2] > self.highs[-1][2])): - res = self.high0 - self.low0 = self.high0 - self.highs.append(self.high0) - self.lohis.append(self.high0) - self.prev_lohi = HIGH - if self.low0[1] < cdt - ONE_MINUTE and \ - ((self.prev_lohi == LOW and \ - self.low0[2] < self.lows[-1][2]) or - (self.prev_lohi == HIGH and \ - self.low0[2] < self.highs[-1][2] - self.bias)): - res = self.low0 - self.high0 = self.low0 - self.lows.append(self.low0) - self.lohis.append(self.low0) - self.prev_lohi = LOW - if res: - return (self.prev_lohi, res) - else: - return None - - -def find_lows_highs(xs, ys): - dacp = DelayedAcp(10) - for tick in zip(range(len(xs)), xs, ys): - dacp(tick) - return dacp.lows, dacp.highs - - -class DelayedAcp: - """Time series max & min detector.""" - def __init__(self, bias): - assert(bias > 0) - self.bias = bias - self.trend = None - self.mm0 = None - self.lohis = [ ] - self.lows = [ ] - self.highs = [ ] - - def __call__(self, tick): - """Add extended tick to the max min parser. - - @param tick: The value of the current tick. - @type tick: tuple(n, cdt, last) - - @return: 1. Tick if new max min has been detected, - 2. None otherwise. - """ - n, cdt, last = tick - res = None - # automatic initialisation - if self.mm0 is None: - # initialise water mark - self.mm0 = tick - res = self.mm0 - self.lows = [(n, cdt, last - 1)] - self.highs = [(n, cdt, last + 1)] - else: - # initialise trend until price has changed - if self.trend is None or self.trend == 0: - self.trend = cmp(last, self.mm0[2]) - # check for max - if self.trend > 0: - if last > self.mm0[2]: - self.mm0 = tick - if last < self.mm0[2] - self.bias: - self.lohis.append(self.mm0) - self.highs.append(self.mm0) - res = self.mm0 - # revert trend & water mark - self.mm0 = tick - self.trend = -1 - # check for min - if self.trend < 0: - if last < self.mm0[2]: - self.mm0 = tick - if last > self.mm0[2] + self.bias: - self.lohis.append(self.mm0) - self.lows.append(self.mm0) - res = self.mm0 - # revert trend & water mark - self.mm0 = tick - self.trend = +1 - return (cmp(self.trend, 0), res) - - -class Main: - def __init__(self): - warnings.simplefilter("default", np.RankWarning) - self.ylow = None - self.yhigh = None - self.advance_count = 1 - - self.root = Tk.Tk() - self.root.wm_title("Embedding in TK") - - # create plot - fig = plt.figure() - self.ax1 = fig.add_subplot(211) # ticks - self.ax2 = fig.add_subplot(212) # diff from polyfit - # ax3 = fig.add_subplot(313) # cash - - self.ax1.set_ylabel("ticks") - self.ax2.set_ylabel("polyfit diff") - # ax3.set_ylabel("cash") - - major_fmt = mdates.DateFormatter('%H:%M:%S') - self.ax1.xaxis.set_major_formatter(major_fmt) - self.ax1.format_xdata = major_fmt - self.ax1.format_ydata = lambda x: '%1.2f' % x - self.ax1.grid(True) - - self.ax2.xaxis.set_major_formatter(major_fmt) - self.ax2.format_xdata = major_fmt - self.ax2.format_ydata = lambda x: '%1.2f' % x - self.ax2.grid(True) - - # rotates and right aligns the x labels, and moves the bottom of the - # axes up to make room for them - fig.autofmt_xdate() - - # create artists - LOG.debug("Loading ticks...") - self.x, self.y = tdl(datetime.datetime(2009, 6, 3)) - LOG.debug("Ticks loaded.") - lows, highs = find_lows_highs(self.x, self.y) - - self.mmh = Lohi(10) - - self.w0 = 0 - self.wd = 1000 - self.low_high_crs = 0 - xr, yr = self.tick_window(self.w0, self.wd) - fit = np.average(yr) - - self.tl, = self.ax1.plot_date(xr, yr, '-') - self.fl, = self.ax1.plot_date(xr, (fit,) * len(xr), 'k--') - self.mh, = self.ax1.plot_date(xr, (yr[0],) * len(xr), 'g:') - self.ml, = self.ax1.plot_date(xr, (yr[0],) * len(xr), 'r:') - self.him, = self.ax1.plot_date([x for n, x, y in lows], [y for n, x, y in lows], 'go') - self.lom, = self.ax1.plot_date([x for n, x, y in highs], [y for n, x, y in highs], 'ro') - - self.dl, = self.ax2.plot_date(xr, (0,) * len(xr), '-') - - self.set_axis(xr, yr) - - # embed canvas in Tk - self.canvas = FigureCanvasTkAgg(fig, master=self.root) - self.canvas.draw() - self.canvas.get_tk_widget().pack(side=Tk.TOP, fill=Tk.BOTH, expand=Tk.TRUE) - - # toolbar = NavigationToolbar2TkAgg( self.canvas, self.root ) - # toolbar.update() - # self.canvas._tkself.canvas.pack(side=Tk.TOP, fill=Tk.BOTH, expand=1) - - fr1 = Tk.Frame(master=self.root) - bu1 = Tk.Button(master=fr1, text='Quit', command=self.root.quit) - bu2 = Tk.Button(master=fr1, text='Stop', command=self.stop) - bu3 = Tk.Button(master=fr1, text='Resume', command=self.resume) - bu4 = Tk.Button(master=fr1, text='1x', command=self.times_one) - bu5 = Tk.Button(master=fr1, text='5x', command=self.times_five) - bu6 = Tk.Button(master=fr1, text='10x', command=self.times_ten) - bu1.pack(side=Tk.RIGHT, padx=5, pady=5) - bu6.pack(side=Tk.RIGHT, padx=5, pady=5) - bu5.pack(side=Tk.RIGHT, padx=5, pady=5) - bu4.pack(side=Tk.RIGHT, padx=5, pady=5) - bu2.pack(side=Tk.RIGHT, padx=5, pady=5) - bu3.pack(side=Tk.RIGHT, padx=5, pady=5) - fr1.pack(side=Tk.BOTTOM) - - - def animate(self): - self.w0 += self.advance_count - # prepare timeline window - xr, yr = self.tick_window(self.w0, self.wd) - while self.low_high_crs < self.w0 + self.wd: - self.mark_low_high(self.low_high_crs) - self.low_high_crs += 1 - # build polynomial fit - mms = self.mmh.lohis - if len(mms) > 1: - mx = [ num2sod(x) for x in xr ] - my = yr - polyval = np.polyfit(mx, my, 10) - fit = np.polyval(polyval, mx) - self.fl.set_data(xr, fit) - # calc diff - self.dl.set_data(xr, yr - fit) - highs = self.mmh.highs - if len(highs) >= 4: - coefs = np.polyfit([num2sod(x) for n, x, y in highs[-4:]], [y for n, x, y in highs[-4:]], 3) - self.mh.set_data(xr, [np.polyval(coefs, num2sod(x)) for x in xr]) - lows = self.mmh.lows - if len(lows) >= 4: - coefs = np.polyfit([num2sod(x) for n, x, y in lows[-4:]], [y for n, x, y in lows[-4:]], 3) - self.ml.set_data(xr, [np.polyval(coefs, num2sod(x)) for x in xr]) - # update tick line - self.tl.set_data(xr, yr) - # update axis - self.set_axis(xr, yr) - self.canvas.draw() - if self.w0 < len(self.x) - self.wd - 1: - self.after_id = self.root.after(10, self.animate) - - def set_axis(self, xr, yr, bias=50): - if self.ylow is None: - self.ylow = yr[0] - bias / 2 - self.yhigh = yr[0] + bias / 2 - for y in yr: - if y < self.ylow: - self.ylow = y - self.yhigh = self.ylow + bias - if y > self.yhigh: - self.yhigh = y - self.ylow = self.yhigh - bias - self.ax1.axis([xr[0], xr[-1], self.ylow, self.yhigh]) - self.ax2.axis([xr[0], xr[-1], -25, +25]) - - def tick_window(self, w0, wd = 1000): - return (self.x[w0:w0 + wd], self.y[w0:w0 + wd]) - - def mark_low_high(self, n): - x = self.x - y = self.y - rc = self.mmh((n, x[n], y[n])) - if rc: - lohi, tick = rc - nlh, xlh, ylh = tick - if lohi < 0: - # low - self.ax1.annotate('low', - xy=(x[nlh], y[nlh]), - xytext=(x[n], y[nlh]), - arrowprops=dict(facecolor='red', - frac=0.3, - shrink=0.1)) - elif lohi > 0: - # high - self.ax1.annotate('high', - xy=(x[nlh], y[nlh]), - xytext=(x[n], y[nlh]), - arrowprops=dict(facecolor='green', - frac=0.3, - shrink=0.1)) - - def stop(self): - if self.after_id: - self.root.after_cancel(self.after_id) - self.after_id = None - - def resume(self): - if self.after_id is None: - self.after_id = self.root.after(10, self.animate) - - def times_one(self): - self.advance_count = 1 - - def times_five(self): - self.advance_count = 5 - - def times_ten(self): - self.advance_count = 10 - - def run(self): - self.root.after(500, self.animate) - self.root.mainloop() - self.root.destroy() - - -if __name__ == "__main__": - app = Main() - app.run() diff --git a/mpl/fibionacci3.py b/mpl/fibionacci3.py deleted file mode 100644 index f714e6e..0000000 --- a/mpl/fibionacci3.py +++ /dev/null @@ -1,420 +0,0 @@ -# Copyright (c) 2008 Andreas Balogh -# See LICENSE for details. - -""" animated drawing of ticks - - using self.canvas embedded in Tk application - - Fibionacci retracements of 61.8, 50.0, 38.2, 23.6 % of min and max -""" - -# system imports - -import datetime -import os -import re -import logging -import warnings -import math - -import Tkinter as Tk -import numpy as np - -import matplotlib -matplotlib.use('TkAgg') - -from matplotlib.backends.backend_tkagg import FigureCanvasTkAgg -import matplotlib.pyplot as plt -import matplotlib.dates as mdates -from matplotlib.dates import date2num - -# local imports - -# constants - -ONE_MINUTE = 60. / 86400. -LOW, NONE, HIGH = range(-1, 2) - -# globals - -LOG = logging.getLogger() - -logging.basicConfig(level=logging.DEBUG, - format='%(asctime)s.%(msecs)03i %(levelname).4s %(process)d:%(thread)d %(message)s', - datefmt='%H:%M:%S') - -MDF_REO = re.compile("(..):(..):(..)\.*(\d+)*") - - -def tdl(tick_date): - """ returns a list of tick tuples (cdt, last) for specified day """ - fiid = "846900" - year = tick_date.strftime("%Y") - yyyymmdd = tick_date.strftime("%Y%m%d") - filename = "%s.csv" % (fiid) - filepath = os.path.join("c:\\rttrd-prd-var\\consors-mdf\\data", year, yyyymmdd, filename) - x = [ ] - y = [ ] - fh = open(filepath, "r") - try: - prev_last = "" - for line in fh: - flds = line.split(",") - # determine file version - if flds[2] == "LAST": - last = float(flds[3]) - vol = float(flds[4]) - else: - last = float(flds[4]) - vol = 0.0 - # skip ticks with same last price - if prev_last == last: - continue - else: - prev_last = last - # parse time - mobj = MDF_REO.match(flds[0]) - if mobj is None: - raise ValueError("no match for [%s]" % (flds[0],)) - (hh, mm, ss, ms) = mobj.groups() - if ms: - c_time = datetime.time(int(hh), int(mm), int(ss), int(ms) * 1000) - else: - c_time = datetime.time(int(hh), int(mm), int(ss)) - cdt = datetime.datetime.combine(tick_date, c_time) - x.append(date2num(cdt)) - y.append(last) - finally: - fh.close() - # throw away first line of file (close price from previous day) - del x[0] - del y[0] - return (x, y) - -def num2sod(x): - frac, integ = math.modf(x) - return frac * 86400 - -class Lohi: - """Time series online low and high detector.""" - def __init__(self, bias): - assert(bias > 0) - self.bias = bias - self.low0 = None - self.high0 = None - self.prev_lohi = NONE - self.lohis = [ ] - self.lows = [ ] - self.highs = [ ] - - def __call__(self, tick): - """Add extended tick to the max min parser. - - @param tick: The value of the current tick. - @type tick: tuple(cdt, last) - - @return: 1. Tick if new max min has been detected, - 2. None otherwise. - """ - n, cdt, last = tick - res = None - # automatic initialisation - if self.low0 is None: - self.low0 = tick - self.lows.append((n, cdt, last - 1)) - if self.high0 is None: - self.high0 = tick - self.highs.append((n, cdt, last + 1)) - if last > self.high0[2]: - self.high0 = tick - if self.prev_lohi == NONE: - if self.high0[2] > self.low0[2] + self.bias: - res = self.high0 - self.low0 = self.high0 - self.lows.append(self.high0) - self.lohis.append(self.high0) - self.prev_lohi = HIGH - if last < self.low0[2]: - self.low0 = tick - if self.prev_lohi == NONE: - if self.low0[2] < self.high0[2] - self.bias: - res = self.low0 - self.high0 = self.low0 - self.lows.append(self.low0) - self.lohis.append(self.low0) - self.prev_lohi = LOW - if self.high0[1] < cdt - ONE_MINUTE and \ - ((self.prev_lohi == LOW and \ - self.high0[2] > self.lows[-1][2] + self.bias) or - (self.prev_lohi == HIGH and \ - self.high0[2] > self.highs[-1][2])): - res = self.high0 - self.low0 = self.high0 - self.highs.append(self.high0) - self.lohis.append(self.high0) - self.prev_lohi = HIGH - if self.low0[1] < cdt - ONE_MINUTE and \ - ((self.prev_lohi == LOW and \ - self.low0[2] < self.lows[-1][2]) or - (self.prev_lohi == HIGH and \ - self.low0[2] < self.highs[-1][2] - self.bias)): - res = self.low0 - self.high0 = self.low0 - self.lows.append(self.low0) - self.lohis.append(self.low0) - self.prev_lohi = LOW - if res: - return (self.prev_lohi, res) - else: - return None - - -def find_lows_highs(xs, ys): - dacp = DelayedAcp(10) - for tick in zip(range(len(xs)), xs, ys): - dacp(tick) - return dacp.lows, dacp.highs - - -class DelayedAcp: - """Time series max & min detector.""" - def __init__(self, bias): - assert(bias > 0) - self.bias = bias - self.trend = None - self.mm0 = None - self.lohis = [ ] - self.lows = [ ] - self.highs = [ ] - - def __call__(self, tick): - """Add extended tick to the max min parser. - - @param tick: The value of the current tick. - @type tick: tuple(n, cdt, last) - - @return: 1. Tick if new max min has been detected, - 2. None otherwise. - """ - n, cdt, last = tick - res = None - # automatic initialisation - if self.mm0 is None: - # initialise water mark - self.mm0 = tick - res = self.mm0 - self.lows = [(n, cdt, last - 1)] - self.highs = [(n, cdt, last + 1)] - else: - # initialise trend until price has changed - if self.trend is None or self.trend == 0: - self.trend = cmp(last, self.mm0[2]) - # check for max - if self.trend > 0: - if last > self.mm0[2]: - self.mm0 = tick - if last < self.mm0[2] - self.bias: - self.lohis.append(self.mm0) - self.highs.append(self.mm0) - res = self.mm0 - # revert trend & water mark - self.mm0 = tick - self.trend = -1 - # check for min - if self.trend < 0: - if last < self.mm0[2]: - self.mm0 = tick - if last > self.mm0[2] + self.bias: - self.lohis.append(self.mm0) - self.lows.append(self.mm0) - res = self.mm0 - # revert trend & water mark - self.mm0 = tick - self.trend = +1 - return (cmp(self.trend, 0), res) - - -class Main: - def __init__(self): - warnings.simplefilter("default", np.RankWarning) - self.ylow = None - self.yhigh = None - self.advance_count = 1 - - self.root = Tk.Tk() - self.root.wm_title("Embedding in TK") - - # create plot - fig = plt.figure() - self.ax1 = fig.add_subplot(211) # ticks - self.ax2 = fig.add_subplot(212) # diff from polyfit - # ax3 = fig.add_subplot(313) # cash - - self.ax1.set_ylabel("ticks") - self.ax2.set_ylabel("polyfit diff") - # ax3.set_ylabel("cash") - - major_fmt = mdates.DateFormatter('%H:%M:%S') - self.ax1.xaxis.set_major_formatter(major_fmt) - self.ax1.format_xdata = major_fmt - self.ax1.format_ydata = lambda x: '%1.2f' % x - self.ax1.grid(True) - - self.ax2.xaxis.set_major_formatter(major_fmt) - self.ax2.format_xdata = major_fmt - self.ax2.format_ydata = lambda x: '%1.2f' % x - self.ax2.grid(True) - - # rotates and right aligns the x labels, and moves the bottom of the - # axes up to make room for them - fig.autofmt_xdate() - - # create artists - LOG.debug("Loading ticks...") - self.x, self.y = tdl(datetime.datetime(2009, 6, 3)) - LOG.debug("Ticks loaded.") - lows, highs = find_lows_highs(self.x, self.y) - - self.mmh = Lohi(10) - - self.w0 = 0 - self.wd = 1000 - self.low_high_crs = 0 - xr, yr = self.tick_window(self.w0, self.wd) - fit = np.average(yr) - - self.tl, = self.ax1.plot_date(xr, yr, '-') - self.fl, = self.ax1.plot_date(xr, (fit,) * len(xr), 'k--') - self.mh, = self.ax1.plot_date(xr, (yr[0],) * len(xr), 'g:') - self.ml, = self.ax1.plot_date(xr, (yr[0],) * len(xr), 'r:') - # Acp markers - self.him, = self.ax1.plot_date([x for n, x, y in lows], [y for n, x, y in lows], 'go') - self.lom, = self.ax1.plot_date([x for n, x, y in highs], [y for n, x, y in highs], 'ro') - - self.dl, = self.ax2.plot_date(xr, (0,) * len(xr), '-') - - self.set_axis(xr, yr) - - # embed canvas in Tk - self.canvas = FigureCanvasTkAgg(fig, master=self.root) - self.canvas.draw() - self.canvas.get_tk_widget().pack(side=Tk.TOP, fill=Tk.BOTH, expand=Tk.TRUE) - - # toolbar = NavigationToolbar2TkAgg( self.canvas, self.root ) - # toolbar.update() - # self.canvas._tkself.canvas.pack(side=Tk.TOP, fill=Tk.BOTH, expand=1) - - fr1 = Tk.Frame(master=self.root) - bu1 = Tk.Button(master=fr1, text='Quit', command=self.root.quit) - bu2 = Tk.Button(master=fr1, text='Stop', command=self.stop) - bu3 = Tk.Button(master=fr1, text='Resume', command=self.resume) - bu4 = Tk.Button(master=fr1, text='1x', command=self.times_one) - bu5 = Tk.Button(master=fr1, text='5x', command=self.times_five) - bu6 = Tk.Button(master=fr1, text='10x', command=self.times_ten) - bu1.pack(side=Tk.RIGHT, padx=5, pady=5) - bu6.pack(side=Tk.RIGHT, padx=5, pady=5) - bu5.pack(side=Tk.RIGHT, padx=5, pady=5) - bu4.pack(side=Tk.RIGHT, padx=5, pady=5) - bu2.pack(side=Tk.RIGHT, padx=5, pady=5) - bu3.pack(side=Tk.RIGHT, padx=5, pady=5) - fr1.pack(side=Tk.BOTTOM) - - - def animate(self): - self.w0 += self.advance_count - # prepare timeline window - xr, yr = self.tick_window(self.w0, self.wd) - while self.low_high_crs < self.w0 + self.wd: - self.mark_low_high(self.low_high_crs) - self.low_high_crs += 1 - # build polynomial fit - lohis = self.mmh.lohis - if len(lohis) >= 4: - n0, x0, y0 = lohis[-4] - n1, x1, y1 = lohis[-1] - x2 = xr[-1] - coefs = np.polyfit([num2sod(x) for n, x, y in lohis[-4:]], [y for n, x, y in lohis[-4:]], 1) - self.fl.set_data((x0, x2), [np.polyval(coefs, num2sod(x)) for x in (x0, x2)]) - # width of trend channel - mx = 0 - for n in range(n0, n1): - mx = max(mx, math.fabs(np.polyval(coefs, num2sod(self.x[n])) - self.y[n])) - a, b = coefs - self.mh.set_data((x0, x2), [np.polyval((a, b+mx), num2sod(x)) for x in (x0, x2)]) - self.ml.set_data((x0, x2), [np.polyval((a, b-mx), num2sod(x)) for x in (x0, x2)]) - # update tick line - self.tl.set_data(xr, yr) - # update axis - self.set_axis(xr, yr) - self.canvas.draw() - if self.w0 < len(self.x) - self.wd - 1: - self.after_id = self.root.after(10, self.animate) - - def set_axis(self, xr, yr, bias=50): - if self.ylow is None: - self.ylow = yr[0] - bias / 2 - self.yhigh = yr[0] + bias / 2 - for y in yr: - if y < self.ylow: - self.ylow = y - self.yhigh = self.ylow + bias - if y > self.yhigh: - self.yhigh = y - self.ylow = self.yhigh - bias - self.ax1.axis([xr[0], xr[-1], self.ylow, self.yhigh]) - self.ax2.axis([xr[0], xr[-1], -25, +25]) - - def tick_window(self, w0, wd = 1000): - return (self.x[w0:w0 + wd], self.y[w0:w0 + wd]) - - def mark_low_high(self, n): - x = self.x - y = self.y - rc = self.mmh((n, x[n], y[n])) - if rc: - lohi, tick = rc - nlh, xlh, ylh = tick - if lohi < 0: - # low - self.ax1.annotate('low', - xy=(x[nlh], y[nlh]), - xytext=(x[n], y[nlh]), - arrowprops=dict(facecolor='red', - frac=0.3, - shrink=0.1)) - elif lohi > 0: - # high - self.ax1.annotate('high', - xy=(x[nlh], y[nlh]), - xytext=(x[n], y[nlh]), - arrowprops=dict(facecolor='green', - frac=0.3, - shrink=0.1)) - - def stop(self): - if self.after_id: - self.root.after_cancel(self.after_id) - self.after_id = None - - def resume(self): - if self.after_id is None: - self.after_id = self.root.after(10, self.animate) - - def times_one(self): - self.advance_count = 1 - - def times_five(self): - self.advance_count = 5 - - def times_ten(self): - self.advance_count = 10 - - def run(self): - self.root.after(500, self.animate) - self.root.mainloop() - self.root.destroy() - - -if __name__ == "__main__": - app = Main() - app.run() diff --git a/mpl/fibionacci4.py b/mpl/fibionacci4.py deleted file mode 100644 index fb7a391..0000000 --- a/mpl/fibionacci4.py +++ /dev/null @@ -1,480 +0,0 @@ -# Copyright (c) 2008 Andreas Balogh -# See LICENSE for details. - -""" animated drawing of ticks - - using self.canvas embedded in Tk application - - Fibionacci retracements of 61.8, 50.0, 38.2, 23.6 % of min and max -""" - -# system imports - -import datetime -import os -import re -import logging -import warnings -import math - -import Tkinter as Tk -import numpy as np - -import matplotlib -matplotlib.use('TkAgg') - -from matplotlib.backends.backend_tkagg import FigureCanvasTkAgg -import matplotlib.pyplot as plt -import matplotlib.dates as mdates -from matplotlib.dates import date2num - -# local imports - -from globals import * - -# constants - -ONE_MINUTE = 60. / 86400. -LOW, NONE, HIGH = range(-1, 2) - -# globals - -LOG = logging.getLogger() - -logging.basicConfig(level=logging.DEBUG, - format='%(asctime)s.%(msecs)03i %(levelname).4s %(process)d:%(thread)d %(message)s', - datefmt='%H:%M:%S') - -MDF_REO = re.compile("(..):(..):(..)\.*(\d+)*") - - -def tdl(tick_date): - """ returns a list of tick tuples (cdt, last) for specified day """ - fiid = "846900" - year = tick_date.strftime("%Y") - yyyymmdd = tick_date.strftime("%Y%m%d") - filename = "%s.csv" % (fiid) - filepath = os.path.join(RTTRD_VAR, "consors-mdf\\data", year, yyyymmdd, filename) - x = [ ] - y = [ ] - v = [ ] - fh = open(filepath, "r") - try: - prev_last = "" - for line in fh: - flds = line.split(",") - # determine file version - if flds[2] == "LAST": - last = float(flds[3]) - vol = float(flds[4]) - else: - last = float(flds[4]) - vol = 0.0 - # skip ticks with same last price - if prev_last == last: - continue - else: - prev_last = last - # parse time - mobj = MDF_REO.match(flds[0]) - if mobj is None: - raise ValueError("no match for [%s]" % (flds[0],)) - (hh, mm, ss, ms) = mobj.groups() - if ms: - c_time = datetime.time(int(hh), int(mm), int(ss), int(ms) * 1000) - else: - c_time = datetime.time(int(hh), int(mm), int(ss)) - cdt = datetime.datetime.combine(tick_date, c_time) - x.append(date2num(cdt)) - y.append(last) - v.append(vol) - finally: - fh.close() - # throw away first line of file (close price from previous day) - del x[0] - del y[0] - del v[0] - return (x, y, v) - -def num2sod(x): - frac, integ = math.modf(x) - return frac * 86400 - -class Lohi: - """Time series online low and high detector.""" - def __init__(self, bias): - assert(bias > 0) - self.bias = bias - self.low0 = None - self.high0 = None - self.prev_lohi = NONE - self.lohis = [ ] - self.lows = [ ] - self.highs = [ ] - - def __call__(self, tick): - """Add extended tick to the max min parser. - - @param tick: The value of the current tick. - @type tick: tuple(cdt, last) - - @return: 1. Tick if new max min has been detected, - 2. None otherwise. - """ - n, cdt, last = tick - res = None - # automatic initialisation - if self.low0 is None: - self.low0 = tick - self.lows.append((n, cdt, last - 1)) - if self.high0 is None: - self.high0 = tick - self.highs.append((n, cdt, last + 1)) - if last > self.high0[2]: - self.high0 = tick - if self.prev_lohi == NONE: - if self.high0[2] > self.low0[2] + self.bias: - res = self.high0 - self.low0 = self.high0 - self.lows.append(self.high0) - self.lohis.append(self.high0) - self.prev_lohi = HIGH - if last < self.low0[2]: - self.low0 = tick - if self.prev_lohi == NONE: - if self.low0[2] < self.high0[2] - self.bias: - res = self.low0 - self.high0 = self.low0 - self.lows.append(self.low0) - self.lohis.append(self.low0) - self.prev_lohi = LOW - if self.high0[1] < cdt - ONE_MINUTE and \ - ((self.prev_lohi == LOW and \ - self.high0[2] > self.lows[-1][2] + self.bias) or - (self.prev_lohi == HIGH and \ - self.high0[2] > self.highs[-1][2])): - res = self.high0 - self.low0 = self.high0 - self.highs.append(self.high0) - self.lohis.append(self.high0) - self.prev_lohi = HIGH - if self.low0[1] < cdt - ONE_MINUTE and \ - ((self.prev_lohi == LOW and \ - self.low0[2] < self.lows[-1][2]) or - (self.prev_lohi == HIGH and \ - self.low0[2] < self.highs[-1][2] - self.bias)): - res = self.low0 - self.high0 = self.low0 - self.lows.append(self.low0) - self.lohis.append(self.low0) - self.prev_lohi = LOW - if res: - return (self.prev_lohi, res) - else: - return None - - -def find_lows_highs(xs, ys): - dacp = DelayedAcp(10) - for tick in zip(range(len(xs)), xs, ys): - dacp(tick) - return dacp.lows, dacp.highs - - -class DelayedAcp: - """Time series max & min detector.""" - def __init__(self, bias): - assert(bias > 0) - self.bias = bias - self.trend = None - self.mm0 = None - self.lohis = [ ] - self.lows = [ ] - self.highs = [ ] - - def __call__(self, tick): - """Add extended tick to the max min parser. - - @param tick: The value of the current tick. - @type tick: tuple(n, cdt, last) - - @return: 1. Tick if new max min has been detected, - 2. None otherwise. - """ - n, cdt, last = tick - res = None - # automatic initialisation - if self.mm0 is None: - # initialise water mark - self.mm0 = tick - res = self.mm0 - self.lows = [(n, cdt, last - 1)] - self.highs = [(n, cdt, last + 1)] - else: - # initialise trend until price has changed - if self.trend is None or self.trend == 0: - self.trend = cmp(last, self.mm0[2]) - # check for max - if self.trend > 0: - if last > self.mm0[2]: - self.mm0 = tick - if last < self.mm0[2] - self.bias: - self.lohis.append(self.mm0) - self.highs.append(self.mm0) - res = self.mm0 - # revert trend & water mark - self.mm0 = tick - self.trend = -1 - # check for min - if self.trend < 0: - if last < self.mm0[2]: - self.mm0 = tick - if last > self.mm0[2] + self.bias: - self.lohis.append(self.mm0) - self.lows.append(self.mm0) - res = self.mm0 - # revert trend & water mark - self.mm0 = tick - self.trend = +1 - return (cmp(self.trend, 0), res) - - -class Main: - def __init__(self): - warnings.simplefilter("default", np.RankWarning) - self.advance_count = 1 - self.ylow = None - self.yhigh = None - self.fiblo = None - self.fibhi = None - self.fibs = None - - self.root = Tk.Tk() - self.root.wm_title("Embedding in TK") - - # create plot - fig = plt.figure() - self.ax1 = fig.add_subplot(211) # ticks - self.ax2 = fig.add_subplot(212) # volume - # ax3 = fig.add_subplot(313) # cash - - self.ax1.set_ylabel("ticks") - self.ax2.set_ylabel("volume") - # ax3.set_ylabel("cash") - - major_fmt = mdates.DateFormatter('%H:%M:%S') - major_loc = mdates.MinuteLocator(byminute = range(0, 60, 10)) - minor_loc = mdates.MinuteLocator() - self.ax1.xaxis.set_major_formatter(major_fmt) - self.ax1.xaxis.set_major_locator(major_loc) - self.ax1.xaxis.set_minor_locator(minor_loc) - self.ax1.format_xdata = major_fmt - self.ax1.format_ydata = lambda x: '%1.2f' % x - self.ax1.grid(True) - - self.ax2.xaxis.set_major_formatter(major_fmt) - self.ax2.xaxis.set_major_locator(mdates.MinuteLocator(byminute = range(0, 60, 10))) - self.ax2.xaxis.set_minor_locator(mdates.MinuteLocator()) - self.ax2.format_xdata = major_fmt - self.ax2.format_ydata = lambda x: '%1.2f' % x - self.ax2.grid(True) - - # rotates and right aligns the x labels, and moves the bottom of the - # axes up to make room for them - fig.autofmt_xdate() - - # create artists - LOG.debug("Loading ticks...") - self.xs, self.ys, self.vs = tdl(datetime.datetime(2009, 6, 29)) - LOG.debug("Ticks loaded.") - lows, highs = find_lows_highs(self.xs, self.ys) - - self.mmh = Lohi(5) - - self.w0 = 0 - self.wd = 2000 - self.low_high_crs = 0 - xr, yr, vr = self.tick_window(self.w0, self.wd) - self.fiblo = self.fibhi = (0, self.xs[0], self.ys[0]) - fit = np.average(yr) - - self.tl, = self.ax1.plot_date(xr, yr, '-') - self.fl, = self.ax1.plot_date(xr, (fit,) * len(xr), 'k--') - self.mh, = self.ax1.plot_date(xr, (yr[0],) * len(xr), 'k-') - self.ml, = self.ax1.plot_date(xr, (yr[0],) * len(xr), 'k-') - # Acp markers - self.him, = self.ax1.plot_date([x for n, x, y in lows], [y for n, x, y in lows], 'go') - self.lom, = self.ax1.plot_date([x for n, x, y in highs], [y for n, x, y in highs], 'ro') - - self.dl, = self.ax2.plot_date(xr, vr, '-') - - self.set_axis(xr, yr) - - # embed canvas in Tk - self.canvas = FigureCanvasTkAgg(fig, master=self.root) - self.canvas.draw() - self.canvas.get_tk_widget().pack(side=Tk.TOP, fill=Tk.BOTH, expand=Tk.TRUE) - - # toolbar = NavigationToolbar2TkAgg( self.canvas, self.root ) - # toolbar.update() - # self.canvas._tkself.canvas.pack(side=Tk.TOP, fill=Tk.BOTH, expand=1) - - fr1 = Tk.Frame(master=self.root) - bu1 = Tk.Button(master=fr1, text='Quit', command=self.root.quit) - bu2 = Tk.Button(master=fr1, text='Stop', command=self.stop) - bu4 = Tk.Button(master=fr1, text='1x', command=self.times_one) - bu5 = Tk.Button(master=fr1, text='5x', command=self.times_five) - bu6 = Tk.Button(master=fr1, text='10x', command=self.times_ten) - bu1.pack(side=Tk.RIGHT, padx=5, pady=5) - bu6.pack(side=Tk.RIGHT, padx=5, pady=5) - bu5.pack(side=Tk.RIGHT, padx=5, pady=5) - bu4.pack(side=Tk.RIGHT, padx=5, pady=5) - bu2.pack(side=Tk.RIGHT, padx=5, pady=5) - fr1.pack(side=Tk.BOTTOM) - - - def animate(self): - self.w0 += self.advance_count - # prepare timeline window - xr, yr, vr = self.tick_window(self.w0, self.wd) - while self.low_high_crs < self.w0 + self.wd: - self.mark_low_high(self.low_high_crs) - self.fib_low_high(self.low_high_crs) - self.low_high_crs += 1 - # build polynomial fit - lohis = self.mmh.lohis - if len(lohis) >= 4: - n0, x0, y0 = lohis[-4] - n1, x1, y1 = lohis[-1] - x2 = xr[-1] - coefs = np.polyfit([num2sod(x) for n, x, y in lohis[-4:]], [y for n, x, y in lohis[-4:]], 1) - self.fl.set_data((x0, x2), [np.polyval(coefs, num2sod(x)) for x in (x0, x2)]) - # width of trend channel - mx = 0 - for n in range(n0, n1): - mx = max(mx, math.fabs(np.polyval(coefs, num2sod(self.xs[n])) - self.ys[n])) - a, b = coefs - self.mh.set_data((x0, x2), [np.polyval((a, b+mx), num2sod(x)) for x in (x0, x2)]) - self.ml.set_data((x0, x2), [np.polyval((a, b-mx), num2sod(x)) for x in (x0, x2)]) - # update tick line - self.tl.set_data(xr, yr) - self.dl.set_data(xr, vr) - # update axis - self.set_axis(xr, yr) - self.canvas.draw() - if self.w0 < len(self.xs) - self.wd - 1: - self.after_id = self.root.after(10, self.animate) - - def set_axis(self, xr, yr, bias=50): - if self.ylow is None: - self.ylow = yr[0] - bias / 2 - self.yhigh = yr[0] + bias / 2 - for y in yr: - if y < self.ylow: - self.ylow = y - self.yhigh = self.ylow + bias - if y > self.yhigh: - self.yhigh = y - self.ylow = self.yhigh - bias - self.ax1.axis([xr[0], xr[-1], self.ylow, self.yhigh]) - self.ax2.axis([xr[0], xr[-1], 0, 50000]) - - def tick_window(self, w0, wd = 1000): - return (self.xs[w0:w0 + wd], self.ys[w0:w0 + wd], self.vs[w0:w0 + wd]) - - def fib_low_high(self, n): - tick = (n, self.xs[n], self.ys[n]) - n, x, y = tick - hin, hix, hiy = self.fibhi - lon, lox, loy = self.fiblo - delta = hiy - loy - # 61.8, 50.0, 38.2, 23.6 % - y61 = loy + delta * 0.618 - y50 = loy + delta * 0.50 - y38 = loy + delta * 0.382 - y23 = loy + delta * 0.236 - if y < self.fiblo[2]: - self.fiblo = tick - if y > self.fibhi[2]: - self.fibhi = tick - if self.fibs is not None: - if lox > hix and y > y50: - self.fibs = None - self.fibhi = tick - if lox < hix and y < y50: - self.fibs = None - self.fiblo = tick - # create fib lines if lo hi differs more than 10 pts - if delta > 10: - xr = (min(lox, hix), x) - if self.fibs is None: - l100, = self.ax1.plot_date(xr, (hiy, hiy), 'r-') - l61, = self.ax1.plot_date(xr, (y61, y61), 'r--') - l50, = self.ax1.plot_date(xr, (y50, y50), 'r--') - l38, = self.ax1.plot_date(xr, (y38, y38), 'r--') - l23, = self.ax1.plot_date(xr, (y23, y23), 'r--') - l0, = self.ax1.plot_date(xr, (loy, loy), 'r-') - self.fibs = (l100, l61, l50, l38, l23, l0) - else: - l100, l61, l50, l38, l23, l0 = self.fibs - l100.set_data(xr, (hiy, hiy)) - l61.set_data(xr, (y61, y61)) - l50.set_data(xr, (y50, y50)) - l38.set_data(xr, (y38, y38)) - l23.set_data(xr, (y23, y23)) - l0.set_data(xr, (loy, loy)) - - def mark_low_high(self, n): - x = self.xs - y = self.ys - rc = self.mmh((n, x[n], y[n])) - if rc: - lohi, tick = rc - nlh, xlh, ylh = tick - if lohi < 0: - # low - self.ax1.annotate('low', - xy=(x[nlh], y[nlh]), - xytext=(x[n], y[nlh]), - arrowprops=dict(facecolor='red', - frac=0.3, - shrink=0.1)) - elif lohi > 0: - # high - self.ax1.annotate('high', - xy=(x[nlh], y[nlh]), - xytext=(x[n], y[nlh]), - arrowprops=dict(facecolor='green', - frac=0.3, - shrink=0.1)) - - def stop(self): - if self.after_id: - self.root.after_cancel(self.after_id) - self.after_id = None - - def resume(self): - if self.after_id is None: - self.after_id = self.root.after(10, self.animate) - - def times_one(self): - self.advance_count = 1 - self.resume() - - def times_five(self): - self.advance_count = 5 - self.resume() - - def times_ten(self): - self.advance_count = 10 - self.resume() - - def run(self): - self.root.after(500, self.animate) - self.root.mainloop() - self.root.destroy() - - -if __name__ == "__main__": - app = Main() - app.run() diff --git a/mpl/globals.py b/mpl/globals.py deleted file mode 100644 index cfee0d2..0000000 --- a/mpl/globals.py +++ /dev/null @@ -1,18 +0,0 @@ -# Copyright (c) 2009 Andreas Balogh -# See LICENSE for details. - -""" Globals - -Global variables - -Note: this module implements the singleton pattern. -""" - -# system imports - -# local imports - -# constants - -RTTRD_VAR = "d:\\rttrd-prd-var" -PAD_DATA = "d:\\rttrd-dev-var\\analysis\\data" diff --git a/mpl/mpl-blit.py b/mpl/mpl-blit.py deleted file mode 100644 index 615e7a8..0000000 --- a/mpl/mpl-blit.py +++ /dev/null @@ -1,174 +0,0 @@ -# Copyright (c) 2009 Andreas Balogh -# See LICENSE for details. - -""" animated drawing of ticks - - using blit method -""" - -# system imports - -import Tkinter as tk -import datetime -import os -import re -import logging -import sys - -import matplotlib.pyplot as plt -from matplotlib.dates import date2num -import matplotlib.dates as mdates -import numpy as np - -# local imports - -# constants - -# globals - -LOG = logging.getLogger() - -logging.basicConfig(level=logging.DEBUG, - format='%(asctime)s.%(msecs)03i %(levelname).4s %(process)d:%(thread)d %(message)s', - datefmt='%H:%M:%S') - -MDF_REO = re.compile("(..):(..):(..)\.*(\d+)*") - -def tdl(tick_date): - """ returns a list of tick tuples (cdt, last) for specified day """ - fiid = "846900" - year = tick_date.strftime("%Y") - yyyymmdd = tick_date.strftime("%Y%m%d") - filename = "%s.csv" % (fiid) - filepath = os.path.join("d:\\rttrd-prd-var\\consors-mdf\\data", year, yyyymmdd, filename) - x = [ ] - y = [ ] - fh = open(filepath, "r") - try: - prev_last = "" - for line in fh: - flds = line.split(",") - # determine file version - if flds[2] == "LAST": - last = float(flds[3]) - vol = float(flds[4]) - else: - last = float(flds[4]) - vol = 0.0 - # skip ticks with same last price - if prev_last == last: - continue - else: - prev_last = last - # parse time - mobj = MDF_REO.match(flds[0]) - if mobj is None: - raise ValueError("no match for [%s]" % (flds[0],)) - (hh, mm, ss, ms) = mobj.groups() - if ms: - c_time = datetime.time(int(hh), int(mm), int(ss), int(ms) * 1000) - else: - c_time = datetime.time(int(hh), int(mm), int(ss)) - cdt = datetime.datetime.combine(tick_date, c_time) - x.append(date2num(cdt)) - y.append(last) - finally: - fh.close() - # throw away first line of file (close price from previous day) - del x[0] - del y[0] - return (x, y) - - -def main(): - LOG.debug("Loading ticks...") - x, y = tdl(datetime.datetime(2009,6,3)) - LOG.debug("Ticks loaded.") - - fig = plt.figure() - ax1 = fig.add_subplot(311) # ticks - canvas = ax1.figure.canvas - # ax2 = fig.add_subplot(312) # gearing - # ax3 = fig.add_subplot(313) # cash - - ax1.set_ylabel("ticks") - # ax2.set_ylabel("gearing") - # ax3.set_ylabel("cash") - - xr = [x[0]] * 500 - yr = [y[0]] * 500 - - line, = ax1.plot_date(xr, yr, '-', xdate = True) - major_fmt = mdates.DateFormatter('%H:%M:%S') - ax1.xaxis.set_major_formatter(major_fmt) - # ax1.axis('tight') - - # format the coords message box - def price(x): return '%1.2f'%x - # ax.format_xdata = mdates.DateFormatter('%Y-%m-%d') - ax1.format_xdata = mdates.DateFormatter('%H:%M:%S') - ax1.format_ydata = price - ax1.grid(True) - - # rotates and right aligns the x labels, and moves the bottom of the - # axes up to make room for them - fig.autofmt_xdate() - - background = canvas.copy_from_bbox(ax1.bbox) - - def animate(): - w = 1000 - bias = 10 - - ymin = min(y[0:100]) - ymax = max(y[0:100]) - - low = ymin - bias - high = ymax + bias - trend = 0 - - for i in range(0, len(x)-w): - # restore the clean slate background - canvas.restore_region(background) - # prepare timeline window - xr = x[i:i+w] - yr = y[i:i+w] - # update line - line.set_xdata(xr) - line.set_ydata(yr) - # determine y axis - if yr[-1] > ymax: - ymax = yr[-1] - if ymax - 50 < min(yr): - ymin = ymax - 50 - if yr[-1] < ymin: - ymin = yr[-1] - if ymin + 50 > max(yr): - ymax = ymin + 50 - ax1.axis([xr[0], xr[-1], ymin, ymax]) - # check low high and annotate - if i > 0 and i % 150 == 0: - ax1.annotate('low/high', - xy = (xr[-10], yr[-10]), - xytext = (xr[-5], yr[-10] + 10), - arrowprops = dict(facecolor = 'black', - frac = 0.2, - headwidth = 10, - linewidth = 0.1, - shrink = 0.05)) - - # just draw the animated artist - # FIXME: redraw all items in graph - ax1.draw_artist(line) - # just redraw the axes rectangle - canvas.blit(ax1.bbox) - - - - root = fig.canvas.manager.window - root.after(100, animate) - plt.show() - - -if __name__ == "__main__": - main() diff --git a/mpl/mpl-draw.py b/mpl/mpl-draw.py deleted file mode 100644 index 3f139ee..0000000 --- a/mpl/mpl-draw.py +++ /dev/null @@ -1,163 +0,0 @@ -# Copyright (c) 2008 Andreas Balogh -# See LICENSE for details. - -""" animated drawing of ticks - - using draw method -""" - -# system imports - -import Tkinter as tk -import datetime -import os -import re -import logging -import sys - -import matplotlib.pyplot as plt -from matplotlib.dates import date2num -import matplotlib.dates as mdates -import numpy as np - -# local imports - -# constants - -# globals - -LOG = logging.getLogger() - -logging.basicConfig(level=logging.DEBUG, - format='%(asctime)s.%(msecs)03i %(levelname).4s %(process)d:%(thread)d %(message)s', - datefmt='%H:%M:%S') - -MDF_REO = re.compile("(..):(..):(..)\.*(\d+)*") - -def tdl(tick_date): - """ returns a list of tick tuples (cdt, last) for specified day """ - fiid = "846900" - year = tick_date.strftime("%Y") - yyyymmdd = tick_date.strftime("%Y%m%d") - filename = "%s.csv" % (fiid) - filepath = os.path.join("d:\\rttrd-prd-var\\consors-mdf\\data", year, yyyymmdd, filename) - x = [ ] - y = [ ] - fh = open(filepath, "r") - try: - prev_last = "" - for line in fh: - flds = line.split(",") - # determine file version - if flds[2] == "LAST": - last = float(flds[3]) - vol = float(flds[4]) - else: - last = float(flds[4]) - vol = 0.0 - # skip ticks with same last price - if prev_last == last: - continue - else: - prev_last = last - # parse time - mobj = MDF_REO.match(flds[0]) - if mobj is None: - raise ValueError("no match for [%s]" % (flds[0],)) - (hh, mm, ss, ms) = mobj.groups() - if ms: - c_time = datetime.time(int(hh), int(mm), int(ss), int(ms) * 1000) - else: - c_time = datetime.time(int(hh), int(mm), int(ss)) - cdt = datetime.datetime.combine(tick_date, c_time) - x.append(date2num(cdt)) - y.append(last) - finally: - fh.close() - # throw away first line of file (close price from previous day) - del x[0] - del y[0] - return (x, y) - - -def main(): - LOG.debug("Loading ticks...") - x, y = tdl(datetime.datetime(2009,6,3)) - LOG.debug("Ticks loaded.") - - fig = plt.figure() - ax1 = fig.add_subplot(311) # ticks - # ax2 = fig.add_subplot(312) # gearing - # ax3 = fig.add_subplot(313) # cash - - ax1.set_ylabel("ticks") - # ax2.set_ylabel("gearing") - # ax3.set_ylabel("cash") - - xr = [x[0]] * 500 - yr = [y[0]] * 500 - - line, = ax1.plot_date(xr, yr, '-', xdate = True) - major_fmt = mdates.DateFormatter('%H:%M:%S') - ax1.xaxis.set_major_formatter(major_fmt) - # ax1.axis('tight') - - # format the coords message box - def price(x): return '%1.2f'%x - # ax.format_xdata = mdates.DateFormatter('%Y-%m-%d') - ax1.format_xdata = mdates.DateFormatter('%H:%M:%S') - ax1.format_ydata = price - ax1.grid(True) - - # rotates and right aligns the x labels, and moves the bottom of the - # axes up to make room for them - fig.autofmt_xdate() - - def animate(): - w = 1000 - bias = 10 - - ymin = min(y[0:100]) - ymax = max(y[0:100]) - - low = ymin - bias - high = ymax + bias - trend = 0 - - for i in range(0, len(x)-w): - # prepare timeline window - xr = x[i:i+w] - yr = y[i:i+w] - # update line - line.set_xdata(xr) - line.set_ydata(yr) - # determine y axis - if yr[-1] > ymax: - ymax = yr[-1] - if ymax - 50 < min(yr): - ymin = ymax - 50 - if yr[-1] < ymin: - ymin = yr[-1] - if ymin + 50 > max(yr): - ymax = ymin + 50 - ax1.axis([xr[0], xr[-1], ymin, ymax]) - # check low high and annotate - if i > 0 and i % 150 == 0: - ax1.annotate('low/high', - xy = (xr[-10], yr[-10]), - xytext = (xr[-5], yr[-10] + 10), - arrowprops = dict(facecolor = 'black', - frac = 0.2, - headwidth = 10, - linewidth = 0.1, - shrink = 0.05)) - - fig.canvas.draw() - - root = fig.canvas.manager.window - root.after(100, animate) - plt.show() - - -if __name__ == "__main__": - main() diff --git a/mpl/mpl-embedded.py b/mpl/mpl-embedded.py deleted file mode 100644 index 2d5de0f..0000000 --- a/mpl/mpl-embedded.py +++ /dev/null @@ -1,330 +0,0 @@ -# Copyright (c) 2008 Andreas Balogh -# See LICENSE for details. - -""" animated drawing of ticks - - using self.canvas embedded in Tk application - - Fibionacci retracements of 61.8, 50.0, 38.2, 23.6 % of min and max -""" - -# system imports - -import datetime -import os -import re -import logging -import sys -import warnings - -import Tkinter as Tk -import numpy as np - -import matplotlib -matplotlib.use('TkAgg') - -from matplotlib.backends.backend_tkagg import FigureCanvasTkAgg, NavigationToolbar2TkAgg -from matplotlib.figure import Figure -import matplotlib.pyplot as plt -import matplotlib.dates as mdates -from matplotlib.dates import date2num - -# local imports - -# constants - -# globals - -LOG = logging.getLogger() - -logging.basicConfig(level=logging.DEBUG, - format='%(asctime)s.%(msecs)03i %(levelname).4s %(process)d:%(thread)d %(message)s', - datefmt='%H:%M:%S') - -MDF_REO = re.compile("(..):(..):(..)\.*(\d+)*") - -def tdl(tick_date): - """ returns a list of tick tuples (cdt, last) for specified day """ - fiid = "846900" - year = tick_date.strftime("%Y") - yyyymmdd = tick_date.strftime("%Y%m%d") - filename = "%s.csv" % (fiid) - filepath = os.path.join("d:\\rttrd-prd-var\\consors-mdf\\data", year, yyyymmdd, filename) - x = [ ] - y = [ ] - fh = open(filepath, "r") - try: - prev_last = "" - for line in fh: - flds = line.split(",") - # determine file version - if flds[2] == "LAST": - last = float(flds[3]) - vol = float(flds[4]) - else: - last = float(flds[4]) - vol = 0.0 - # skip ticks with same last price - if prev_last == last: - continue - else: - prev_last = last - # parse time - mobj = MDF_REO.match(flds[0]) - if mobj is None: - raise ValueError("no match for [%s]" % (flds[0],)) - (hh, mm, ss, ms) = mobj.groups() - if ms: - c_time = datetime.time(int(hh), int(mm), int(ss), int(ms) * 1000) - else: - c_time = datetime.time(int(hh), int(mm), int(ss)) - cdt = datetime.datetime.combine(tick_date, c_time) - x.append(date2num(cdt)) - y.append(last) - finally: - fh.close() - # throw away first line of file (close price from previous day) - del x[0] - del y[0] - return (x, y) - - -class Mmh: - """Time series max & min detector.""" - def __init__(self, bias): - assert(bias > 0) - self.bias = bias - self.trend = None - self.mm0 = None - self.mms = [ ] - self.mins = [ ] - self.maxs = [ ] - - def __call__(self, tick): - """Add extended tick to the max min parser. - - @param tick: The value of the current tick. - @type tick: tuple(cdt, last) - - @return: 1. Tick if new max min has been detected, - 2. None otherwise. - """ - n, cdt, last = tick - res = None - # automatic initialisation - if self.mm0 is None: - # initalise water mark - self.mm0 = tick - res = self.mm0 - self.mins = [(n, cdt, last - 1)] - self.maxs = [(n, cdt, last + 1)] - else: - # initalise trend until price has changed - if self.trend is None or self.trend == 0: - self.trend = cmp(last, self.mm0[2]) - # check for max - if self.trend > 0: - if last > self.mm0[2]: - self.mm0 = tick - if last < self.mm0[2] - self.bias: - self.mms.insert(0, self.mm0) - self.maxs.append(self.mm0) - res = self.mm0 - # revert trend & water mark - self.mm0 = tick - self.trend = -1 - # check for min - if self.trend < 0: - if last < self.mm0[2]: - self.mm0 = tick - if last > self.mm0[2] + self.bias: - self.mms.insert(0, self.mm0) - self.mins.append(self.mm0) - res = self.mm0 - # revert trend & water mark - self.mm0 = tick - self.trend = +1 - return res - - -class Main: - def __init__(self): - LOG.debug("Loading ticks...") - self.x, self.y = tdl(datetime.datetime(2009,6,3)) - LOG.debug("Ticks loaded.") - self.mmh = Mmh(10) - - self.root = Tk.Tk() - self.root.wm_title("Embedding in TK") - - fig = plt.figure() - self.ax1 = fig.add_subplot(111) # ticks - # ax2 = fig.add_subplot(312) # gearing - # ax3 = fig.add_subplot(313) # cash - - self.ax1.set_ylabel("ticks") - # ax2.set_ylabel("gearing") - # ax3.set_ylabel("cash") - - self.w = 1000 - self.bias = 10 - - xr = self.x[0:self.w] - yr = self.y[0:self.w] - - fit = np.average(yr) - - self.tl, = self.ax1.plot_date(xr, yr, '-') - self.fl, = self.ax1.plot_date(xr, (fit, ) * self.w, 'k--') - self.mh, = self.ax1.plot_date(xr, (yr[0], ) * self.w, 'g:') - self.ml, = self.ax1.plot_date(xr, (yr[0], ) * self.w, 'r:') - major_fmt = mdates.DateFormatter('%H:%M:%S') - self.ax1.xaxis.set_major_formatter(major_fmt) - - self.ax1.format_xdata = mdates.DateFormatter('%H:%M:%S') - self.ax1.format_ydata = lambda x: '%1.2f'%x - self.ax1.grid(True) - - # rotates and right aligns the x labels, and moves the bottom of the - # axes up to make room for them - fig.autofmt_xdate() - self.ax1.axis([xr[0], xr[-1]+180./86400., min(yr), max(yr)]) - - self.canvas = FigureCanvasTkAgg(fig, master=self.root) - self.canvas.draw() - self.canvas.get_tk_widget().pack(side=Tk.TOP, fill=Tk.BOTH, expand=1) - - # toolbar = NavigationToolbar2TkAgg( self.canvas, self.root ) - # toolbar.update() - # self.canvas._tkself.canvas.pack(side=Tk.TOP, fill=Tk.BOTH, expand=1) - - fr1 = Tk.Frame(master = self.root) - bu1 = Tk.Button(master = fr1, text='Quit', command=self.root.quit) - bu2 = Tk.Button(master = fr1, text='Stop', command=self.stop) - bu3 = Tk.Button(master = fr1, text='Resume', command=self.resume) - bu1.pack(side = Tk.RIGHT, padx = 5, pady = 5) - bu2.pack(side = Tk.RIGHT, padx = 5, pady = 5) - bu3.pack(side = Tk.RIGHT, padx = 5, pady = 5) - fr1.pack(side = Tk.BOTTOM) - - def animate_start(self): - warnings.simplefilter("default", np.RankWarning) - - self.ymin = min(self.y[0:self.w]) - self.ymax = max(self.y[0:self.w]) - - self.low = self.ymin - self.bias - self.high = self.ymax + self.bias - self.trend = 0 - self.i = 0 - for n in range(0, self.w): - self.mark_low_high(n) - self.root.after(500, self.animate_step) - - def animate_step(self): - # prepare timeline window - xr = np.array( self.x[self.i:self.i+self.w] ) - yr = np.array( self.y[self.i:self.i+self.w] ) - # update line - self.tl.set_xdata(xr) - self.tl.set_ydata(yr) - # determine y axis - if yr[-1] > self.ymax: - self.ymax = yr[-1] - if self.ymax - 50 < min(yr): - self.ymin = self.ymax - 50 - if yr[-1] < self.ymin: - self.ymin = yr[-1] - if self.ymin + 50 > max(yr): - self.ymax = self.ymin + 50 - # check self.low self.high and annotate - fwd = 2 - for n in range(self.i+self.w, self.i+self.w+fwd): - self.mark_low_high(n) - self.i += fwd - # build polynomial fit - mms = self.mmh.mms - if len(mms) > 1: - # mx = [ (x - int(x)) * 86400 for n, x, y in mms[:4] ] - # my = [ y for n, x, y in mms[:4] ] - mx = [ (x - int(x)) * 86400 for x in xr ] - my = yr - xre = [ xr[-1] + x/86400. for x in range(1, 181)] - xr2 = np.append(xr, xre) - # print "mx: ", mx - # print "my: ", my - polyval = np.polyfit(mx, my, 30) - # print "poly1d: ", polyval - intx = np.array(xr, dtype = int) - sodx = xr - intx - sodx *= 86400 - s2x = np.append(sodx, range(sodx[-1], sodx[-1]+180)) - fit = np.polyval(polyval, s2x) - self.fl.set_xdata(xr2) - self.fl.set_ydata(fit) - maxs = self.mmh.maxs - if len(maxs) > 1: - n, x1, y1 = maxs[-2] - n, x2, y2 = maxs[-1] - x3 = xr[-1] - polyfit = np.polyfit((x1, x2), (y1, y2), 1) - y3 = np.polyval(polyfit, x3) - self.mh.set_data((x1, x2, x3), (y1, y2, y3)) - mins = self.mmh.mins - if len(mins) > 1: - n, x1, y1 = mins[-2] - n, x2, y2 = mins[-1] - x3 = xr[-1] - polyfit = np.polyfit((x1, x2), (y1, y2), 1) - y3 = np.polyval(polyfit, x3) - self.ml.set_data((x1, x2, x3), (y1, y2, y3)) - # draw - self.ax1.axis([xr[0], xr[-1]+180./86400., self.ymin, self.ymax]) - self.canvas.draw() - if self.i < len(self.x)-self.w-1: - self.after_id = self.root.after(10, self.animate_step) - - def build_fit(self): - pass - - def mark_low_high(self, n): - x = self.x - y = self.y - rc = self.mmh((n, x[n], y[n])) - if rc: - nlh, xlh, ylh = rc - if self.mmh.trend > 0: - # low - self.ax1.annotate('low', - xy = (x[nlh], y[nlh]), - xytext = (x[n], y[nlh]), - arrowprops = dict(facecolor = 'red', - shrink = 0.05)) - # an.set_annotation_clip(True) - elif self.mmh.trend < 0: - # high - self.ax1.annotate('high', - xy = (x[nlh], y[nlh]), - xytext = (x[n], y[nlh]), - arrowprops = dict(facecolor = 'green', - shrink = 0.05)) - # an.set_annotation_clip(True) - - def stop(self): - if self.after_id: - self.root.after_cancel(self.after_id) - self.after_id = None - - def resume(self): - if self.after_id is None: - self.after_id = self.root.after(10, self.animate_step) - - def run(self): - self.root.after(500, self.animate_start) - self.root.mainloop() - self.root.destroy() - - -if __name__ == "__main__": - app = Main() - app.run() diff --git a/mpl/pad1.py b/mpl/pad1.py deleted file mode 100644 index f61ece6..0000000 --- a/mpl/pad1.py +++ /dev/null @@ -1,499 +0,0 @@ -# Copyright (c) 2008 Andreas Balogh -# See LICENSE for details. - -""" patterns and distance - -A. collect data -1. one tick every minute 10-20 mins back -2. use LoHi max until market close as performance indicator - -B. cluster and analyse data according to distance -1. find clusters with net positive P&L. many clusters will exhibit useless patterns - -C. check performance with backtest - - -""" - -# system imports - -import datetime -import os -import re -import logging -import warnings -import math - -import Tkinter as Tk -import numpy as np - -import matplotlib -matplotlib.use('TkAgg') - -from matplotlib.backends.backend_tkagg import FigureCanvasTkAgg -import matplotlib.pyplot as plt -import matplotlib.dates as mdates -from matplotlib.dates import date2num - -# local imports - -from globals import * - -# constants - -ONE_MINUTE = 60. / 86400. -LOW, NONE, HIGH = range(-1, 2) - -# globals - -LOG = logging.getLogger() - -logging.basicConfig(level=logging.DEBUG, - format='%(asctime)s.%(msecs)03i %(levelname).4s %(process)d:%(thread)d %(message)s', - datefmt='%H:%M:%S') - -MDF_REO = re.compile("(..):(..):(..)\.*(\d+)*") - - -def tdl(tick_date): - """ returns a list of tick tuples (cdt, last) for specified day """ - fiid = "846900" - year = tick_date.strftime("%Y") - yyyymmdd = tick_date.strftime("%Y%m%d") - filename = "%s.csv" % (fiid) - filepath = os.path.join(RTTRD_VAR, "consors-mdf\\data", year, yyyymmdd, filename) - x = [ ] - y = [ ] - v = [ ] - fh = open(filepath, "r") - try: - prev_last = "" - for line in fh: - flds = line.split(",") - # determine file version - if flds[2] == "LAST": - last = float(flds[3]) - vol = float(flds[4]) - else: - last = float(flds[4]) - vol = 0.0 - # skip ticks with same last price - if prev_last == last: - continue - else: - prev_last = last - # parse time - mobj = MDF_REO.match(flds[0]) - if mobj is None: - raise ValueError("no match for [%s]" % (flds[0],)) - (hh, mm, ss, ms) = mobj.groups() - if ms: - c_time = datetime.time(int(hh), int(mm), int(ss), int(ms) * 1000) - else: - c_time = datetime.time(int(hh), int(mm), int(ss)) - cdt = datetime.datetime.combine(tick_date, c_time) - x.append(date2num(cdt)) - y.append(last) - v.append(vol) - finally: - fh.close() - # throw away first line of file (close price from previous day) - del x[0] - del y[0] - del v[0] - return (x, y, v) - -def num2sod(x): - frac, integ = math.modf(x) - return frac * 86400 - -class Lohi: - """Time series online low and high detector.""" - def __init__(self, bias): - assert(bias > 0) - self.bias = bias - self.low0 = None - self.high0 = None - self.prev_lohi = NONE - self.lohis = [ ] - self.lows = [ ] - self.highs = [ ] - - def __call__(self, tick): - """Add extended tick to the max min parser. - - @param tick: The value of the current tick. - @type tick: tuple(cdt, last) - - @return: 1. Tick if new max min has been detected, - 2. None otherwise. - """ - n, cdt, last = tick - res = None - # automatic initialisation - if self.low0 is None: - self.low0 = tick - self.lows.append((n, cdt, last - 1)) - if self.high0 is None: - self.high0 = tick - self.highs.append((n, cdt, last + 1)) - if last > self.high0[2]: - self.high0 = tick - if self.prev_lohi == NONE: - if self.high0[2] > self.low0[2] + self.bias: - res = self.high0 - self.low0 = self.high0 - self.lows.append(self.high0) - self.lohis.append(self.high0) - self.prev_lohi = HIGH - if last < self.low0[2]: - self.low0 = tick - if self.prev_lohi == NONE: - if self.low0[2] < self.high0[2] - self.bias: - res = self.low0 - self.high0 = self.low0 - self.lows.append(self.low0) - self.lohis.append(self.low0) - self.prev_lohi = LOW - if self.high0[1] < cdt - ONE_MINUTE and \ - ((self.prev_lohi == LOW and \ - self.high0[2] > self.lows[-1][2] + self.bias) or - (self.prev_lohi == HIGH and \ - self.high0[2] > self.highs[-1][2])): - res = self.high0 - self.low0 = self.high0 - self.highs.append(self.high0) - self.lohis.append(self.high0) - self.prev_lohi = HIGH - if self.low0[1] < cdt - ONE_MINUTE and \ - ((self.prev_lohi == LOW and \ - self.low0[2] < self.lows[-1][2]) or - (self.prev_lohi == HIGH and \ - self.low0[2] < self.highs[-1][2] - self.bias)): - res = self.low0 - self.high0 = self.low0 - self.lows.append(self.low0) - self.lohis.append(self.low0) - self.prev_lohi = LOW - if res: - return (self.prev_lohi, res) - else: - return None - - -def find_lows_highs(xs, ys): - dacp = DelayedAcp(10) - for tick in zip(range(len(xs)), xs, ys): - dacp(tick) - return dacp.lows, dacp.highs - - -class DelayedAcp: - """Time series max & min detector.""" - def __init__(self, bias): - assert(bias > 0) - self.bias = bias - self.trend = None - self.mm0 = None - self.lohis = [ ] - self.lows = [ ] - self.highs = [ ] - - def __call__(self, tick): - """Add extended tick to the max min parser. - - @param tick: The value of the current tick. - @type tick: tuple(n, cdt, last) - - @return: 1. Tick if new max min has been detected, - 2. None otherwise. - """ - n, cdt, last = tick - res = None - # automatic initialisation - if self.mm0 is None: - # initialise water mark - self.mm0 = tick - res = self.mm0 - self.lows = [(n, cdt, last - 1)] - self.highs = [(n, cdt, last + 1)] - else: - # initialise trend until price has changed - if self.trend is None or self.trend == 0: - self.trend = cmp(last, self.mm0[2]) - # check for max - if self.trend > 0: - if last > self.mm0[2]: - self.mm0 = tick - if last < self.mm0[2] - self.bias: - self.lohis.append(self.mm0) - self.highs.append(self.mm0) - res = self.mm0 - # revert trend & water mark - self.mm0 = tick - self.trend = -1 - # check for min - if self.trend < 0: - if last < self.mm0[2]: - self.mm0 = tick - if last > self.mm0[2] + self.bias: - self.lohis.append(self.mm0) - self.lows.append(self.mm0) - res = self.mm0 - # revert trend & water mark - self.mm0 = tick - self.trend = +1 - return (cmp(self.trend, 0), res) - - -def harvest_patterns(): - LOG.debug("Loading ticks...") - xs, ys, vs = tdl(datetime.datetime(2009, 6, 25)) - LOG.debug("Ticks loaded.") - lows, highs = find_lows_highs(xs, ys) - - -def analyse_patterns(): - pass - - -class Main: - def __init__(self): - warnings.simplefilter("default", np.RankWarning) - self.advance_count = 1 - self.ylow = None - self.yhigh = None - self.fiblo = None - self.fibhi = None - self.fibs = None - - self.root = Tk.Tk() - self.root.wm_title("Embedding in TK") - - # create plot - fig = plt.figure() - self.ax1 = fig.add_subplot(211) # ticks - self.ax2 = fig.add_subplot(212) # volume - # ax3 = fig.add_subplot(313) # cash - - self.ax1.set_ylabel("ticks") - self.ax2.set_ylabel("volume") - # ax3.set_ylabel("cash") - - major_fmt = mdates.DateFormatter('%H:%M:%S') - major_loc = mdates.MinuteLocator(byminute = range(0, 60, 10)) - minor_loc = mdates.MinuteLocator() - self.ax1.xaxis.set_major_formatter(major_fmt) - self.ax1.xaxis.set_major_locator(major_loc) - self.ax1.xaxis.set_minor_locator(minor_loc) - self.ax1.format_xdata = major_fmt - self.ax1.format_ydata = lambda x: '%1.2f' % x - self.ax1.grid(True) - - self.ax2.xaxis.set_major_formatter(major_fmt) - self.ax2.xaxis.set_major_locator(mdates.MinuteLocator(byminute = range(0, 60, 10))) - self.ax2.xaxis.set_minor_locator(mdates.MinuteLocator()) - self.ax2.format_xdata = major_fmt - self.ax2.format_ydata = lambda x: '%1.2f' % x - self.ax2.grid(True) - - # rotates and right aligns the x labels, and moves the bottom of the - # axes up to make room for them - fig.autofmt_xdate() - - # create artists - LOG.debug("Loading ticks...") - self.xs, self.ys, self.vs = tdl(datetime.datetime(2009, 6, 25)) - LOG.debug("Ticks loaded.") - lows, highs = find_lows_highs(self.xs, self.ys) - - self.mmh = Lohi(5) - - self.w0 = 0 - self.wd = 2000 - self.low_high_crs = 0 - xr, yr, vr = self.tick_window(self.w0, self.wd) - self.fiblo = self.fibhi = (0, self.xs[0], self.ys[0]) - fit = np.average(yr) - - self.tl, = self.ax1.plot_date(xr, yr, '-') - self.fl, = self.ax1.plot_date(xr, (fit,) * len(xr), 'k--') - self.mh, = self.ax1.plot_date(xr, (yr[0],) * len(xr), 'k-') - self.ml, = self.ax1.plot_date(xr, (yr[0],) * len(xr), 'k-') - # Acp markers - self.him, = self.ax1.plot_date([x for n, x, y in lows], [y for n, x, y in lows], 'go') - self.lom, = self.ax1.plot_date([x for n, x, y in highs], [y for n, x, y in highs], 'ro') - - self.dl, = self.ax2.plot_date(xr, vr, '-') - - self.set_axis(xr, yr) - - # embed canvas in Tk - self.canvas = FigureCanvasTkAgg(fig, master=self.root) - self.canvas.draw() - self.canvas.get_tk_widget().pack(side=Tk.TOP, fill=Tk.BOTH, expand=Tk.TRUE) - - # toolbar = NavigationToolbar2TkAgg( self.canvas, self.root ) - # toolbar.update() - # self.canvas._tkself.canvas.pack(side=Tk.TOP, fill=Tk.BOTH, expand=1) - - fr1 = Tk.Frame(master=self.root) - bu1 = Tk.Button(master=fr1, text='Quit', command=self.root.quit) - bu2 = Tk.Button(master=fr1, text='Stop', command=self.stop) - bu4 = Tk.Button(master=fr1, text='1x', command=self.times_one) - bu5 = Tk.Button(master=fr1, text='5x', command=self.times_five) - bu6 = Tk.Button(master=fr1, text='10x', command=self.times_ten) - bu1.pack(side=Tk.RIGHT, padx=5, pady=5) - bu6.pack(side=Tk.RIGHT, padx=5, pady=5) - bu5.pack(side=Tk.RIGHT, padx=5, pady=5) - bu4.pack(side=Tk.RIGHT, padx=5, pady=5) - bu2.pack(side=Tk.RIGHT, padx=5, pady=5) - fr1.pack(side=Tk.BOTTOM) - - - def animate(self): - self.w0 += self.advance_count - # prepare timeline window - xr, yr, vr = self.tick_window(self.w0, self.wd) - while self.low_high_crs < self.w0 + self.wd: - self.mark_low_high(self.low_high_crs) - self.fib_low_high(self.low_high_crs) - self.low_high_crs += 1 - # build polynomial fit - lohis = self.mmh.lohis - if len(lohis) >= 4: - n0, x0, y0 = lohis[-4] - n1, x1, y1 = lohis[-1] - x2 = xr[-1] - coefs = np.polyfit([num2sod(x) for n, x, y in lohis[-4:]], [y for n, x, y in lohis[-4:]], 1) - self.fl.set_data((x0, x2), [np.polyval(coefs, num2sod(x)) for x in (x0, x2)]) - # width of trend channel - mx = 0 - for n in range(n0, n1): - mx = max(mx, math.fabs(np.polyval(coefs, num2sod(self.xs[n])) - self.ys[n])) - a, b = coefs - self.mh.set_data((x0, x2), [np.polyval((a, b+mx), num2sod(x)) for x in (x0, x2)]) - self.ml.set_data((x0, x2), [np.polyval((a, b-mx), num2sod(x)) for x in (x0, x2)]) - # update tick line - self.tl.set_data(xr, yr) - self.dl.set_data(xr, vr) - # update axis - self.set_axis(xr, yr) - self.canvas.draw() - if self.w0 < len(self.xs) - self.wd - 1: - self.after_id = self.root.after(10, self.animate) - - def set_axis(self, xr, yr, bias=50): - if self.ylow is None: - self.ylow = yr[0] - bias / 2 - self.yhigh = yr[0] + bias / 2 - for y in yr: - if y < self.ylow: - self.ylow = y - self.yhigh = self.ylow + bias - if y > self.yhigh: - self.yhigh = y - self.ylow = self.yhigh - bias - self.ax1.axis([xr[0], xr[-1], self.ylow, self.yhigh]) - self.ax2.axis([xr[0], xr[-1], 0, 50000]) - - def tick_window(self, w0, wd = 1000): - return (self.xs[w0:w0 + wd], self.ys[w0:w0 + wd], self.vs[w0:w0 + wd]) - - def fib_low_high(self, n): - tick = (n, self.xs[n], self.ys[n]) - redraw = False - n, x, y = tick - hin, hix, hiy = self.fibhi - lon, lox, loy = self.fiblo - delta = hiy - loy - # 61.8, 50.0, 38.2, 23.6 % - y61 = loy + delta * 0.618 - y50 = loy + delta * 0.50 - y38 = loy + delta * 0.382 - y23 = loy + delta * 0.236 - if y < self.fiblo[2]: - self.fiblo = tick - if y > self.fibhi[2]: - self.fibhi = tick - if self.fibs is not None: - if lox > hix and y > y50: - self.fibs = None - self.fibhi = tick - if lox < hix and y < y50: - self.fibs = None - self.fiblo = tick - # create fib lines if lo hi differs more than 10 pts - if delta > 10: - xr = (min(lox, hix), x) - if self.fibs is None: - l100, = self.ax1.plot_date(xr, (hiy, hiy), 'r-') - l61, = self.ax1.plot_date(xr, (y61, y61), 'r--') - l50, = self.ax1.plot_date(xr, (y50, y50), 'r--') - l38, = self.ax1.plot_date(xr, (y38, y38), 'r--') - l23, = self.ax1.plot_date(xr, (y23, y23), 'r--') - l0, = self.ax1.plot_date(xr, (loy, loy), 'r-') - self.fibs = (l100, l61, l50, l38, l23, l0) - else: - l100, l61, l50, l38, l23, l0 = self.fibs - l100.set_data(xr, (hiy, hiy)) - l61.set_data(xr, (y61, y61)) - l50.set_data(xr, (y50, y50)) - l38.set_data(xr, (y38, y38)) - l23.set_data(xr, (y23, y23)) - l0.set_data(xr, (loy, loy)) - - def mark_low_high(self, n): - x = self.xs - y = self.ys - rc = self.mmh((n, x[n], y[n])) - if rc: - lohi, tick = rc - nlh, xlh, ylh = tick - if lohi < 0: - # low - self.ax1.annotate('low', - xy=(x[nlh], y[nlh]), - xytext=(x[n], y[nlh]), - arrowprops=dict(facecolor='red', - frac=0.3, - shrink=0.1)) - elif lohi > 0: - # high - self.ax1.annotate('high', - xy=(x[nlh], y[nlh]), - xytext=(x[n], y[nlh]), - arrowprops=dict(facecolor='green', - frac=0.3, - shrink=0.1)) - - def stop(self): - if self.after_id: - self.root.after_cancel(self.after_id) - self.after_id = None - - def resume(self): - if self.after_id is None: - self.after_id = self.root.after(10, self.animate) - - def times_one(self): - self.advance_count = 1 - self.resume() - - def times_five(self): - self.advance_count = 5 - self.resume() - - def times_ten(self): - self.advance_count = 10 - self.resume() - - def run(self): - self.root.after(500, self.animate) - self.root.mainloop() - self.root.destroy() - - -if __name__ == "__main__": - app = Main() - app.run() diff --git a/mpl/sw-trend1.py b/mpl/sw-trend1.py deleted file mode 100644 index 5bcad37..0000000 --- a/mpl/sw-trend1.py +++ /dev/null @@ -1,594 +0,0 @@ -# Copyright (c) 2009 Andreas Balogh -# See LICENSE for details. - -""" -Online sliding window with trend analysis - -1. segment tick data with a sliding window alogrithm -2. recognise low/high points by comparing slope information -3. recognise trend by observing low/high point difference -""" - -# system imports - -import datetime -import os -import re -import logging -import warnings -import math - -import Tkinter as Tk -import numpy as np - -import matplotlib as mpl -mpl.use('TkAgg') - -from matplotlib.backends.backend_tkagg import FigureCanvasTkAgg -import matplotlib.pyplot as plt -import matplotlib.dates as mdates -from matplotlib.dates import date2num - -# local imports - -from globals import * - -# constants - -ONE_MINUTE = 60. / 86400. -LOW, NONE, HIGH = range(-1, 2) - -# globals - -LOG = logging.getLogger() - -logging.basicConfig(level=logging.DEBUG, - format='%(asctime)s.%(msecs)03i %(levelname).4s %(process)d:%(thread)d %(message)s', - datefmt='%H:%M:%S') - -MDF_REO = re.compile("(..):(..):(..)\.*(\d+)*") - - -def tdl(tick_date): - """ returns a list of tick tuples (cdt, last) for specified day """ - fiid = "846900" - year = tick_date.strftime("%Y") - yyyymmdd = tick_date.strftime("%Y%m%d") - filename = "%s.csv" % (fiid) - filepath = os.path.join(RTTRD_VAR, "consors-mdf\\data", year, yyyymmdd, filename) - x = [ ] - y = [ ] - v = [ ] - fh = open(filepath, "r") - try: - prev_last = "" - for line in fh: - flds = line.split(",") - # determine file version - if flds[2] == "LAST": - last = float(flds[3]) - vol = float(flds[4]) - else: - last = float(flds[4]) - vol = 0.0 - # skip ticks with same last price - if prev_last == last: - continue - else: - prev_last = last - # parse time - mobj = MDF_REO.match(flds[0]) - if mobj is None: - raise ValueError("no match for [%s]" % (flds[0],)) - (hh, mm, ss, ms) = mobj.groups() - if ms: - c_time = datetime.time(int(hh), int(mm), int(ss), int(ms) * 1000) - else: - c_time = datetime.time(int(hh), int(mm), int(ss)) - cdt = datetime.datetime.combine(tick_date, c_time) - x.append(date2num(cdt)) - y.append(last) - v.append(vol) - finally: - fh.close() - # throw away first line of file (close price from previous day) - del x[0] - del y[0] - del v[0] - return (x, y, v) - -def interpolate_line(xs, ys): - """Fit a straight line y = bx + a to a set of points (x, y) """ - # from two data points only! - x1, x2 = xs - y1, y2 = ys - try: - b = ( y2 - y1 ) / ( x2 - x1 ) - except ZeroDivisionError: - print "interpolate_line: division by zero, ", x1, x2, y1, y2 - b = 0.0 - a = y1 - b * x1 - return (b, a) - -def num2sod(x): - frac, integ = math.modf(x) - return frac * 86400 - -class TimedLohi: - """Time series online low and high detector. - - Confirms low/high candidates after timeout. - Time dependent. - """ - def __init__(self, bias, timeout = ONE_MINUTE): - assert(bias > 0) - self.bias = bias - self.timeout = timeout - self.low0 = None - self.high0 = None - self.prev_lohi = NONE - self.lohis = [ ] - self.lows = [ ] - self.highs = [ ] - - def __call__(self, tick): - """Add extended tick to the max min parser. - - @param tick: The value of the current tick. - @type tick: tuple(cdt, last) - - @return: 1. Tick if new max min has been detected, - 2. None otherwise. - """ - n, cdt, last = tick - res = None - # automatic initialisation - if self.low0 is None: - self.low0 = tick - self.lows.append((n, cdt, last - 1)) - if self.high0 is None: - self.high0 = tick - self.highs.append((n, cdt, last + 1)) - if last > self.high0[2]: - self.high0 = tick - if self.prev_lohi == NONE: - if self.high0[2] > self.low0[2] + self.bias: - res = self.high0 - self.low0 = self.high0 - self.lows.append(self.high0) - self.lohis.append(self.high0) - self.prev_lohi = HIGH - if last < self.low0[2]: - self.low0 = tick - if self.prev_lohi == NONE: - if self.low0[2] < self.high0[2] - self.bias: - res = self.low0 - self.high0 = self.low0 - self.lows.append(self.low0) - self.lohis.append(self.low0) - self.prev_lohi = LOW - if self.high0[1] < cdt - self.timeout and \ - ((self.prev_lohi == LOW and \ - self.high0[2] > self.lows[-1][2] + self.bias) or - (self.prev_lohi == HIGH and \ - self.high0[2] > self.highs[-1][2])): - res = self.high0 - self.low0 = self.high0 - self.highs.append(self.high0) - self.lohis.append(self.high0) - self.prev_lohi = HIGH - if self.low0[1] < cdt - self.timeout and \ - ((self.prev_lohi == LOW and \ - self.low0[2] < self.lows[-1][2]) or - (self.prev_lohi == HIGH and \ - self.low0[2] < self.highs[-1][2] - self.bias)): - res = self.low0 - self.high0 = self.low0 - self.lows.append(self.low0) - self.lohis.append(self.low0) - self.prev_lohi = LOW - if res: - return (self.prev_lohi, res) - else: - return None - - -def find_lows_highs(xs, ys): - dacp = DelayedAcp(10) - for tick in zip(range(len(xs)), xs, ys): - dacp(tick) - return dacp.lows, dacp.highs - - -class DelayedAcp: - """Time series max & min detector.""" - def __init__(self, bias): - assert(bias > 0) - self.bias = bias - self.trend = None - self.mm0 = None - self.lohis = [ ] - self.lows = [ ] - self.highs = [ ] - - def __call__(self, tick): - """Add extended tick to the max min parser. - - @param tick: The value of the current tick. - @type tick: tuple(n, cdt, last) - - @return: 1. Tick if new max min has been detected, - 2. None otherwise. - """ - n, cdt, last = tick - res = None - # automatic initialisation - if self.mm0 is None: - # initialise water mark - self.mm0 = tick - res = self.mm0 - self.lows = [(n, cdt, last - 1)] - self.highs = [(n, cdt, last + 1)] - else: - # initialise trend until price has changed - if self.trend is None or self.trend == 0: - self.trend = cmp(last, self.mm0[2]) - # check for max - if self.trend > 0: - if last > self.mm0[2]: - self.mm0 = tick - if last < self.mm0[2] - self.bias: - self.lohis.append(self.mm0) - self.highs.append(self.mm0) - res = self.mm0 - # revert trend & water mark - self.mm0 = tick - self.trend = -1 - # check for min - if self.trend < 0: - if last < self.mm0[2]: - self.mm0 = tick - if last > self.mm0[2] + self.bias: - self.lohis.append(self.mm0) - self.lows.append(self.mm0) - res = self.mm0 - # revert trend & water mark - self.mm0 = tick - self.trend = +1 - return (cmp(self.trend, 0), res) - - -class SlidingWindow: - """Douglas-Peucker algorithm.""" - def __init__(self, bias): - assert(bias > 0) - self.bias = bias - self.xs = [ ] - self.ys = [ ] - self.segx = [ ] - self.segy = [ ] - self.types = [ ] - self.bs = [ ] - - def __call__(self, tick): - """Add extended tick to the max min parser. - - @param tick: The value of the current tick. - @type tick: tuple(n, cdt, last) - - @return: 1. Tick if new max min has been detected, - 2. None otherwise. - """ - n, cdt, last = tick - max_distance = self.bias - rc = None - self.xs.append(cdt) - self.ys.append(last) - x0, y0 = (self.xs[0], self.ys[0]) - x1, y1 = (self.xs[-1], self.ys[-1]) - if n == 0: - self.segx.append(x0) - self.segy.append(y0) - if len(self.xs) < 2: - return None - # check distance - coefs = interpolate_line((x0, x1), (y0, y1)) - ip_ys = np.polyval(coefs, self.xs) - d_ys = np.absolute(self.ys - ip_ys) - d_max = np.amax(d_ys) - if d_max > max_distance: - n = np.argmax(d_ys) - x2, y2 = (self.xs[n], self.ys[n]) - self.segx.append(x2) - self.segy.append(y2) - segment_added = True - # store slope of segment - b0, a0 = interpolate_line((x0, x2), (y0, y2)) - self.bs.append(b0) - # remove ticks of previous segment - del self.xs[0:n] - del self.ys[0:n] - # slope of current segment - x0, y0 = (self.xs[0], self.ys[0]) - b1, a1 = interpolate_line((x0, x1), (y0, y1)) - lohi = self.get_type(b0, b1) - rc = (x2, y2, lohi) - return (self.segx + [x1], self.segy + [y1], rc) - - def get_type(self, b0, b1): - """ calculate gearing - y: previous slope, x: current slope - <0 ~0 >0 - <0 L L L - ~0 H 0 L - >0 H H H - """ - if b0 < -SMALL and b1 < -SMALL and b0 > b1: - lohi = "d+" - elif b0 < -SMALL and b1 < SMALL and b0 < b1: - lohi = "d-" - elif b0 < -SMALL and b1 > SMALL: - lohi = "L" - elif abs(b0) < SMALL and b1 < -SMALL: - lohi = "d+" - elif abs(b0) < SMALL and abs(b1) < SMALL: - lohi = "0" - elif abs(b0) < SMALL and b1 > SMALL: - lohi = "u+" - elif b0 > SMALL and b1 < -SMALL: - lohi = "H" - elif b0 > SMALL and b1 > -SMALL and b0 > b1: - lohi = "u-" - elif b0 > SMALL and b1 > SMALL and b0 < b1: - lohi = "u+" - else: - lohi = "?" - return lohi - - -SMALL = 1E-10 - - -class Main: - def __init__(self): - warnings.simplefilter("default", np.RankWarning) - self.advance_count = 10 - self.ylow = None - self.yhigh = None - self.segs = [ ] - - self.root = Tk.Tk() - self.root.wm_title("Embedding in TK") - - # create plot - fig = plt.figure() - self.ax1 = fig.add_subplot(211) # ticks - # self.ax2 = fig.add_subplot(312) # slope of line segement - self.ax3 = fig.add_subplot(212) # moving average (10min) - - self.ax1.set_ylabel("ticks") - # self.ax2.set_ylabel("slope") - self.ax3.set_ylabel("gearing") - - major_fmt = mdates.DateFormatter('%H:%M:%S') - self.ax1.xaxis.set_major_formatter(major_fmt) - self.ax1.xaxis.set_major_locator(mdates.MinuteLocator(byminute = range(0, 60, 10))) - self.ax1.xaxis.set_minor_locator(mdates.MinuteLocator()) - self.ax1.format_xdata = major_fmt - self.ax1.format_ydata = lambda x: '%1.2f' % x - self.ax1.grid(True) - - """ - self.ax2.xaxis.set_major_formatter(major_fmt) - self.ax2.xaxis.set_major_locator(mdates.MinuteLocator(byminute = range(0, 60, 10))) - self.ax2.xaxis.set_minor_locator(mdates.MinuteLocator()) - self.ax2.format_xdata = major_fmt - self.ax2.format_ydata = lambda x: '%1.2f' % x - self.ax2.grid(True) - """ - - self.ax3.xaxis.set_major_formatter(major_fmt) - self.ax3.xaxis.set_major_locator(mdates.MinuteLocator(byminute = range(0, 60, 10))) - self.ax3.xaxis.set_minor_locator(mdates.MinuteLocator()) - self.ax3.format_xdata = major_fmt - self.ax3.format_ydata = lambda x: '%1.2f' % x - self.ax3.grid(True) - - # rotates and right aligns the x labels, and moves the bottom of the - # axes up to make room for them - fig.autofmt_xdate() - - # create artists - LOG.debug("Loading ticks...") - self.xs, self.ys, self.vs = tdl(datetime.datetime(2009, 7, 1)) - LOG.debug("Ticks loaded.") - lows, highs = find_lows_highs(self.xs, self.ys) - self.mas = self.ys[:] - self.ss = [ 0 ] * len(self.xs) - self.gs = [ 0 ] * len(self.xs) - - self.mmh = TimedLohi(5) - self.osw = SlidingWindow(2) - - self.w0 = 0 - self.wd = 2000 - self.low_high_crs = 0 - xr, yr, vr, sr, gr = self.tick_window(self.w0, self.wd) - self.n0 = 0 - - # top subplot - self.tl, = self.ax1.plot_date(xr, yr, '-') - self.seg, = self.ax1.plot_date((xr[0], xr[1]), (yr[0], yr[1]), 'k-') - # Acp markers - self.him, = self.ax1.plot_date([x for n, x, y in lows], [y for n, x, y in lows], 'go') - self.lom, = self.ax1.plot_date([x for n, x, y in highs], [y for n, x, y in highs], 'ro') - - # volume subplot - # self.dl, = self.ax2.plot_date(xr, vr, '-') - self.dl, = self.ax1.plot_date(xr, vr, 'g-') - - # slope subplot - # self.sl, = self.ax2.plot_date(xr, sr, '-') - - # gearing subplot - self.gl, = self.ax3.plot_date(xr, gr, '-') - - self.set_axis(xr, yr) - - # embed canvas in Tk - self.canvas = FigureCanvasTkAgg(fig, master=self.root) - self.canvas.draw() - self.canvas.get_tk_widget().pack(side=Tk.TOP, fill=Tk.BOTH, expand=Tk.TRUE) - - # toolbar = NavigationToolbar2TkAgg( self.canvas, self.root ) - # toolbar.update() - # self.canvas._tkself.canvas.pack(side=Tk.TOP, fill=Tk.BOTH, expand=1) - - fr1 = Tk.Frame(master=self.root) - bu1 = Tk.Button(master=fr1, text='Quit', command=self.root.quit) - bu2 = Tk.Button(master=fr1, text='Stop', command=self.stop) - bu4 = Tk.Button(master=fr1, text='1x', command=self.times_one) - bu5 = Tk.Button(master=fr1, text='5x', command=self.times_five) - bu6 = Tk.Button(master=fr1, text='10x', command=self.times_ten) - bu1.pack(side=Tk.RIGHT, padx=5, pady=5) - bu6.pack(side=Tk.RIGHT, padx=5, pady=5) - bu5.pack(side=Tk.RIGHT, padx=5, pady=5) - bu4.pack(side=Tk.RIGHT, padx=5, pady=5) - bu2.pack(side=Tk.RIGHT, padx=5, pady=5) - fr1.pack(side=Tk.BOTTOM) - - - def animate(self): - self.w0 += self.advance_count - # prepare timeline window - xr, yr, vr, sr, gr = self.tick_window(self.w0, self.wd) - while self.low_high_crs < self.w0 + self.wd: - # self.mark_low_high(self.low_high_crs) - self.mark_segments(self.low_high_crs) - self.ma(self.low_high_crs, 10) - self.low_high_crs += 1 - # update tick line - self.tl.set_data(xr, yr) - # update segment slope - # self.sl.set_data(xr, sr) - # update volume line - self.dl.set_data(xr, vr) - # gearing line - self.gl.set_data(xr, gr) - # update axis - self.set_axis(xr, yr) - self.canvas.draw() - if self.w0 < len(self.xs) - self.wd - 1: - self.after_id = self.root.after(10, self.animate) - - def set_axis(self, xr, yr, bias=50): - if self.ylow is None: - self.ylow = yr[0] - bias / 2 - self.yhigh = yr[0] + bias / 2 - for y in yr: - if y < self.ylow: - self.ylow = y - self.yhigh = self.ylow + bias - if y > self.yhigh: - self.yhigh = y - self.ylow = self.yhigh - bias - self.ax1.axis([xr[0], xr[-1], self.ylow, self.yhigh]) - # self.ax2.axis([xr[0], xr[-1], -5, +5]) - self.ax3.axis([xr[0], xr[-1], -50, +50]) - - def tick_window(self, w0, wd = 1000): - return (self.xs[w0:w0 + wd], - self.ys[w0:w0 + wd], - self.mas[w0:w0 + wd], - self.ss[w0:w0 + wd], - self.gs[w0:w0+wd]) - - def ma(self, n0, min): - self.mas[n0] = np.average(self.ys[n0-min*60:n0]) - self.gs[n0] = self.ys[n0] - self.mas[n0] + self.ss[n0] - - def mark_segments(self, n): - x = self.xs - y = self.ys - rc = self.osw((n, x[n], y[n])) - if rc is not None: - segx, segy, lohi = rc - self.seg.set_data(segx, segy) - if lohi is not None: - text = lohi[2] - if text == "u+": - fc = "blue" - dy = -15 - elif text == "d+": - fc = "blue" - dy = +15 - elif text == "H": - fc = "green" - dy = +15 - elif text == "L": - fc = "red" - dy = -15 - else: - fc = None - if fc: - self.ax1.annotate(text, - xy=(lohi[0], lohi[1]), - xytext=(segx[-1], segy[-2]+dy), - arrowprops=dict(facecolor=fc, - frac=0.3, - shrink=0.1)) - - def mark_low_high(self, n): - x = self.xs - y = self.ys - rc = self.mmh((n, x[n], y[n])) - if rc: - lohi, tick = rc - nlh, xlh, ylh = tick - if lohi < 0: - # low - self.ax1.annotate('low', - xy=(x[nlh], y[nlh]), - xytext=(x[n], y[nlh]), - arrowprops=dict(facecolor='red', - frac=0.3, - shrink=0.1)) - elif lohi > 0: - # high - self.ax1.annotate('high', - xy=(x[nlh], y[nlh]), - xytext=(x[n], y[nlh]), - arrowprops=dict(facecolor='green', - frac=0.3, - shrink=0.1)) - - def stop(self): - if self.after_id: - self.root.after_cancel(self.after_id) - self.after_id = None - - def resume(self): - if self.after_id is None: - self.after_id = self.root.after(10, self.animate) - - def times_one(self): - self.advance_count = 1 - self.resume() - - def times_five(self): - self.advance_count = 5 - self.resume() - - def times_ten(self): - self.advance_count = 10 - self.resume() - - def run(self): - self.root.after(500, self.animate) - self.root.mainloop() - self.root.destroy() - - -if __name__ == "__main__": - app = Main() - app.run() diff --git a/mpl/sw-trend2.py b/mpl/sw-trend2.py deleted file mode 100644 index 3ed7ca0..0000000 --- a/mpl/sw-trend2.py +++ /dev/null @@ -1,604 +0,0 @@ -# Copyright (c) 2009 Andreas Balogh -# See LICENSE for details. - -""" -Online sliding window with trend analysis - -1. segment tick data with a sliding window alogrithm -2. recognise low/high points by comparing slope information -3. recognise trend by observing low/high point difference -""" - -# system imports - -import datetime -import os -import re -import logging -import warnings -import math - -import Tkinter as Tk -import numpy as np - -import matplotlib as mpl -mpl.use('TkAgg') - -from matplotlib.backends.backend_tkagg import FigureCanvasTkAgg -import matplotlib.pyplot as plt -import matplotlib.dates as mdates -from matplotlib.dates import date2num - -# local imports - -from namedtuple import NamedTuple -from globals import * - -# constants - -ONE_MINUTE = 60. / 86400. -LOW, NONE, HIGH = range(-1, 2) - -Trend = NamedTuple('Trend', 'n x y') - -# globals - -LOG = logging.getLogger() - -logging.basicConfig(level=logging.DEBUG, - format='%(asctime)s.%(msecs)03i %(levelname).4s %(process)d:%(thread)d %(message)s', - datefmt='%H:%M:%S') - -MDF_REO = re.compile("(..):(..):(..)\.*(\d+)*") - - -def tdl(tick_date): - """ returns a list of tick tuples (cdt, last) for specified day """ - fiid = "846900" - year = tick_date.strftime("%Y") - yyyymmdd = tick_date.strftime("%Y%m%d") - filename = "%s.csv" % (fiid) - filepath = os.path.join(RTTRD_VAR, "consors-mdf\\data", year, yyyymmdd, filename) - x = [ ] - y = [ ] - v = [ ] - fh = open(filepath, "r") - try: - prev_last = "" - for line in fh: - flds = line.split(",") - # determine file version - if flds[2] == "LAST": - last = float(flds[3]) - vol = float(flds[4]) - else: - last = float(flds[4]) - vol = 0.0 - # skip ticks with same last price - if prev_last == last: - continue - else: - prev_last = last - # parse time - mobj = MDF_REO.match(flds[0]) - if mobj is None: - raise ValueError("no match for [%s]" % (flds[0],)) - (hh, mm, ss, ms) = mobj.groups() - if ms: - c_time = datetime.time(int(hh), int(mm), int(ss), int(ms) * 1000) - else: - c_time = datetime.time(int(hh), int(mm), int(ss)) - cdt = datetime.datetime.combine(tick_date, c_time) - x.append(date2num(cdt)) - y.append(last) - v.append(vol) - finally: - fh.close() - # throw away first line of file (close price from previous day) - del x[0] - del y[0] - del v[0] - return (x, y, v) - -def interpolate_line(xs, ys): - """Fit a straight line y = bx + a to a set of points (x, y) """ - # from two data points only! - x1, x2 = xs - y1, y2 = ys - try: - b = ( y2 - y1 ) / ( x2 - x1 ) - except ZeroDivisionError: - print "interpolate_line: division by zero, ", x1, x2, y1, y2 - b = 0.0 - a = y1 - b * x1 - return (b, a) - -def num2sod(x): - frac, integ = math.modf(x) - return frac * 86400 - - -class Bunch: - def __init__(self, **kwds): - self.__dict__.update(kwds) - - -class TimedLohi: - """Time series online low and high detector. - - Confirms low/high candidates after timeout. - Time dependent. - """ - def __init__(self, bias, timeout = ONE_MINUTE): - assert(bias > 0) - self.bias = bias - self.timeout = timeout - self.low0 = None - self.high0 = None - self.prev_lohi = NONE - self.lohis = [ ] - self.lows = [ ] - self.highs = [ ] - - def __call__(self, tick): - """Add extended tick to the max min parser. - - @param tick: The value of the current tick. - @type tick: tuple(cdt, last) - - @return: 1. Tick if new max min has been detected, - 2. None otherwise. - """ - n, cdt, last = tick - res = None - # automatic initialisation - if self.low0 is None: - self.low0 = tick - self.lows.append((n, cdt, last - 1)) - if self.high0 is None: - self.high0 = tick - self.highs.append((n, cdt, last + 1)) - if last > self.high0[2]: - self.high0 = tick - if self.prev_lohi == NONE: - if self.high0[2] > self.low0[2] + self.bias: - res = self.high0 - self.low0 = self.high0 - self.lows.append(self.high0) - self.lohis.append(self.high0) - self.prev_lohi = HIGH - if last < self.low0[2]: - self.low0 = tick - if self.prev_lohi == NONE: - if self.low0[2] < self.high0[2] - self.bias: - res = self.low0 - self.high0 = self.low0 - self.lows.append(self.low0) - self.lohis.append(self.low0) - self.prev_lohi = LOW - if self.high0[1] < cdt - self.timeout and \ - ((self.prev_lohi == LOW and \ - self.high0[2] > self.lows[-1][2] + self.bias) or - (self.prev_lohi == HIGH and \ - self.high0[2] > self.highs[-1][2])): - res = self.high0 - self.low0 = self.high0 - self.highs.append(self.high0) - self.lohis.append(self.high0) - self.prev_lohi = HIGH - if self.low0[1] < cdt - self.timeout and \ - ((self.prev_lohi == LOW and \ - self.low0[2] < self.lows[-1][2]) or - (self.prev_lohi == HIGH and \ - self.low0[2] < self.highs[-1][2] - self.bias)): - res = self.low0 - self.high0 = self.low0 - self.lows.append(self.low0) - self.lohis.append(self.low0) - self.prev_lohi = LOW - if res: - return (self.prev_lohi, res) - else: - return None - - -def find_lows_highs(xs, ys): - dacp = DelayedAcp(10) - for tick in zip(range(len(xs)), xs, ys): - dacp(tick) - return dacp.lows, dacp.highs - - -class DelayedAcp: - """Time series max & min detector.""" - def __init__(self, bias): - assert(bias > 0) - self.bias = bias - self.trend = None - self.mm0 = None - self.lohis = [ ] - self.lows = [ ] - self.highs = [ ] - - def __call__(self, tick): - """Add extended tick to the max min parser. - - @param tick: The value of the current tick. - @type tick: tuple(n, cdt, last) - - @return: 1. Tick if new max min has been detected, - 2. None otherwise. - """ - n, cdt, last = tick - res = None - # automatic initialisation - if self.mm0 is None: - # initialise water mark - self.mm0 = tick - res = self.mm0 - self.lows = [(n, cdt, last - 1)] - self.highs = [(n, cdt, last + 1)] - else: - # initialise trend until price has changed - if self.trend is None or self.trend == 0: - self.trend = cmp(last, self.mm0[2]) - # check for max - if self.trend > 0: - if last > self.mm0[2]: - self.mm0 = tick - if last < self.mm0[2] - self.bias: - self.lohis.append(self.mm0) - self.highs.append(self.mm0) - res = self.mm0 - # revert trend & water mark - self.mm0 = tick - self.trend = -1 - # check for min - if self.trend < 0: - if last < self.mm0[2]: - self.mm0 = tick - if last > self.mm0[2] + self.bias: - self.lohis.append(self.mm0) - self.lows.append(self.mm0) - res = self.mm0 - # revert trend & water mark - self.mm0 = tick - self.trend = +1 - return (cmp(self.trend, 0), res) - - -class SlidingWindow: - """Douglas-Peucker algorithm.""" - def __init__(self, bias): - assert(bias > 0) - self.bias = bias - self.xs = [ ] - self.ys = [ ] - self.segx = [ ] - self.segy = [ ] - self.types = [ ] - self.bs = [ ] - - def __call__(self, tick): - """Add extended tick to the max min parser. - - @param tick: The value of the current tick. - @type tick: tuple(n, cdt, last) - - @return: 1. Tick if new max min has been detected, - 2. None otherwise. - """ - n, cdt, last = tick - max_distance = self.bias - rc = None - self.xs.append(cdt) - self.ys.append(last) - x0, y0 = (self.xs[0], self.ys[0]) - x1, y1 = (self.xs[-1], self.ys[-1]) - if n == 0: - self.segx.append(x0) - self.segy.append(y0) - if len(self.xs) < 2: - return None - # check distance - coefs = interpolate_line((x0, x1), (y0, y1)) - ip_ys = np.polyval(coefs, self.xs) - d_ys = np.absolute(self.ys - ip_ys) - d_max = np.amax(d_ys) - if d_max > max_distance: - n = np.argmax(d_ys) - x2, y2 = (self.xs[n], self.ys[n]) - self.segx.append(x2) - self.segy.append(y2) - segment_added = True - # store slope of segment - b0, a0 = interpolate_line((x0, x2), (y0, y2)) - self.bs.append(b0) - # remove ticks of previous segment - del self.xs[0:n] - del self.ys[0:n] - # slope of current segment - x0, y0 = (self.xs[0], self.ys[0]) - b1, a1 = interpolate_line((x0, x1), (y0, y1)) - lohi = self.get_type(b0, b1) - rc = (x2, y2, lohi) - return (self.segx + [x1], self.segy + [y1], rc) - - def get_type(self, b0, b1): - """ calculate gearing - y: previous slope, x: current slope - <0 ~0 >0 - <0 L L L - ~0 H 0 L - >0 H H H - """ - if b0 < -SMALL and b1 < -SMALL and b0 > b1: - lohi = "d+" - elif b0 < -SMALL and b1 < SMALL and b0 < b1: - lohi = "d-" - elif b0 < -SMALL and b1 > SMALL: - lohi = "L" - elif abs(b0) < SMALL and b1 < -SMALL: - lohi = "d+" - elif abs(b0) < SMALL and abs(b1) < SMALL: - lohi = "0" - elif abs(b0) < SMALL and b1 > SMALL: - lohi = "u+" - elif b0 > SMALL and b1 < -SMALL: - lohi = "H" - elif b0 > SMALL and b1 > -SMALL and b0 > b1: - lohi = "u-" - elif b0 > SMALL and b1 > SMALL and b0 < b1: - lohi = "u+" - else: - lohi = "?" - return lohi - - -SMALL = 1E-10 - - -class Main: - def __init__(self): - warnings.simplefilter("default", np.RankWarning) - self.advance_count = 10 - self.ylow = None - self.yhigh = None - self.trend_starts = None - self.segs = [ ] - - self.root = Tk.Tk() - self.root.wm_title("Embedding in TK") - - # create plot - fig = plt.figure() - self.ax1 = fig.add_subplot(211) # ticks - self.ax2 = fig.add_subplot(212) # moving average (10min) - - self.ax1.set_ylabel("ticks") - self.ax2.set_ylabel("gearing") - - major_fmt = mdates.DateFormatter('%H:%M:%S') - self.ax1.xaxis.set_major_formatter(major_fmt) - self.ax1.xaxis.set_major_locator(mdates.MinuteLocator(byminute = range(0, 60, 10))) - self.ax1.xaxis.set_minor_locator(mdates.MinuteLocator()) - self.ax1.format_xdata = major_fmt - self.ax1.format_ydata = lambda x: '%1.2f' % x - self.ax1.grid(True) - - self.ax2.xaxis.set_major_formatter(major_fmt) - self.ax2.xaxis.set_major_locator(mdates.MinuteLocator(byminute = range(0, 60, 10))) - self.ax2.xaxis.set_minor_locator(mdates.MinuteLocator()) - self.ax2.format_xdata = major_fmt - self.ax2.format_ydata = lambda x: '%1.2f' % x - self.ax2.grid(True) - - # rotates and right aligns the x labels, and moves the bottom of the - # axes up to make room for them - fig.autofmt_xdate() - - # create artists - LOG.debug("Loading ticks...") - self.xs, self.ys, self.vs = tdl(datetime.datetime(2009, 7, 1)) - LOG.debug("Ticks loaded.") - lows, highs = find_lows_highs(self.xs, self.ys) - self.mas = self.ys[:] - - self.mmh = TimedLohi(5) - self.osw = SlidingWindow(2) - - self.w0 = 0 - self.wd = 2000 - self.w_crs = 0 - xr, yr, mar = self.tick_window(self.w0, self.wd) - self.gr = [0.0] * self.wd - - # add artists to top subplot - # tick line and segments - self.tl, = self.ax1.plot_date(xr, yr, '-') - self.seg, = self.ax1.plot_date((xr[0], xr[1]), (yr[0], yr[1]), 'k-') - # Acp markers - self.him, = self.ax1.plot_date([x for n, x, y in lows], [y for n, x, y in lows], 'go') - self.lom, = self.ax1.plot_date([x for n, x, y in highs], [y for n, x, y in highs], 'ro') - # trend lines - self.trd, = self.ax1.plot_date(xr[0:1], yr[0:1], 'k--') - self.trh, = self.ax1.plot_date(xr[0:1], yr[0:1], 'k-') - self.trl, = self.ax1.plot_date(xr[0:1], yr[0:1], 'k-') - - # add artists to bottom subplot - self.gl, = self.ax2.plot_date(xr, self.gr, '-') - - self.set_axis(xr, yr) - - # embed canvas in Tk - self.canvas = FigureCanvasTkAgg(fig, master=self.root) - self.canvas.draw() - self.canvas.get_tk_widget().pack(side=Tk.TOP, fill=Tk.BOTH, expand=Tk.TRUE) - - # toolbar = NavigationToolbar2TkAgg( self.canvas, self.root ) - # toolbar.update() - # self.canvas._tkself.canvas.pack(side=Tk.TOP, fill=Tk.BOTH, expand=1) - - fr1 = Tk.Frame(master=self.root) - bu1 = Tk.Button(master=fr1, text='Quit', command=self.root.quit) - bu2 = Tk.Button(master=fr1, text='Stop', command=self.stop) - bu4 = Tk.Button(master=fr1, text='1x', command=self.times_one) - bu5 = Tk.Button(master=fr1, text='5x', command=self.times_five) - bu6 = Tk.Button(master=fr1, text='10x', command=self.times_ten) - bu1.pack(side=Tk.RIGHT, padx=5, pady=5) - bu6.pack(side=Tk.RIGHT, padx=5, pady=5) - bu5.pack(side=Tk.RIGHT, padx=5, pady=5) - bu4.pack(side=Tk.RIGHT, padx=5, pady=5) - bu2.pack(side=Tk.RIGHT, padx=5, pady=5) - fr1.pack(side=Tk.BOTTOM) - - - def animate(self): - self.w0 += self.advance_count - # prepare timeline window - while self.w_crs < self.w0 + self.wd: - self.ma(self.w_crs, 10) - self.fitter(self.w_crs) - self.w_crs += 1 - xr, yr, mar = self.tick_window(self.w0, self.wd) - # update tick line - self.tl.set_data(xr, yr) - # gearing line - # self.gl.set_data(xr, gr) - # update axis - self.set_axis(xr, yr) - self.canvas.draw() - if self.w0 < len(self.xs) - self.wd - 1: - self.after_id = self.root.after(10, self.animate) - - def set_axis(self, xr, yr, bias=50): - if self.ylow is None: - self.ylow = yr[0] - bias / 2 - self.yhigh = yr[0] + bias / 2 - for y in yr: - if y < self.ylow: - self.ylow = y - self.yhigh = self.ylow + bias - if y > self.yhigh: - self.yhigh = y - self.ylow = self.yhigh - bias - self.ax1.axis([xr[0], xr[-1], self.ylow, self.yhigh]) - self.ax2.axis([xr[0], xr[-1], -50, +50]) - - def tick_window(self, w0, wd = 1000): - return (self.xs[w0:w0 + wd], - self.ys[w0:w0 + wd], - self.mas[w0:w0 + wd], - ) - - def ma(self, n0, min): - self.mas[n0] = np.average(self.ys[n0-min*60:n0]) - - def fitter(self, n0): - # find last low/high within t-1 - # linear regression from t-5 to t-1 - # linear regression within t-1 - # visual inspection - - # determine run-on low and highs - if self.trend_starts is None: - self.trend_starts = [Trend(n=n0, x=self.xs[n0], y=self.ys[n0])] - trend_start = self.trend_starts[-1] - # wait for 30 secs to stabilise - if trend_start.n + 30 > n0: - return - # fit trend - xr = self.xs[trend_start.n:n0] - yr = self.ys[trend_start.n:n0] - ps = np.polyfit(xr, yr, 1) - trend_xs = [xr[0], xr[-1]] - trend_ys = np.polyval(ps, trend_xs) - self.trd.set_data(trend_xs, trend_ys) - # fit counter trend - - - def mark_segments(self, n): - x = self.xs - y = self.ys - rc = self.osw((n, x[n], y[n])) - if rc is not None: - segx, segy, lohi = rc - self.seg.set_data(segx, segy) - if lohi is not None: - text = lohi[2] - if text == "u+": - fc = "blue" - dy = -15 - elif text == "d+": - fc = "blue" - dy = +15 - elif text == "H": - fc = "green" - dy = +15 - elif text == "L": - fc = "red" - dy = -15 - else: - fc = None - if fc: - self.ax1.annotate(text, - xy=(lohi[0], lohi[1]), - xytext=(segx[-1], segy[-2]+dy), - arrowprops=dict(facecolor=fc, - frac=0.3, - shrink=0.1)) - - def mark_low_high(self, n): - x = self.xs - y = self.ys - rc = self.mmh((n, x[n], y[n])) - if rc: - lohi, tick = rc - nlh, xlh, ylh = tick - if lohi < 0: - # low - self.ax1.annotate('low', - xy=(x[nlh], y[nlh]), - xytext=(x[n], y[nlh]), - arrowprops=dict(facecolor='red', - frac=0.3, - shrink=0.1)) - elif lohi > 0: - # high - self.ax1.annotate('high', - xy=(x[nlh], y[nlh]), - xytext=(x[n], y[nlh]), - arrowprops=dict(facecolor='green', - frac=0.3, - shrink=0.1)) - - def stop(self): - if self.after_id: - self.root.after_cancel(self.after_id) - self.after_id = None - - def resume(self): - if self.after_id is None: - self.after_id = self.root.after(10, self.animate) - - def times_one(self): - self.advance_count = 1 - self.resume() - - def times_five(self): - self.advance_count = 5 - self.resume() - - def times_ten(self): - self.advance_count = 10 - self.resume() - - def run(self): - self.root.after(500, self.animate) - self.root.mainloop() - self.root.destroy() - - -if __name__ == "__main__": - app = Main() - app.run()