# Copyright (c) 2008 Andreas Balogh # See LICENSE for details. """ animated drawing of ticks using self.canvas embedded in Tk application Fibionacci retracements of 61.8, 50.0, 38.2, 23.6 % of min and max """ # system imports import datetime import os import re import logging import sys import warnings import Tkinter as Tk import numpy as np import matplotlib matplotlib.use('TkAgg') from matplotlib.backends.backend_tkagg import FigureCanvasTkAgg, NavigationToolbar2TkAgg from matplotlib.figure import Figure import matplotlib.pyplot as plt import matplotlib.dates as mdates from matplotlib.dates import date2num # local imports # constants # globals LOG = logging.getLogger() logging.basicConfig(level=logging.DEBUG, format='%(asctime)s.%(msecs)03i %(levelname).4s %(process)d:%(thread)d %(message)s', datefmt='%H:%M:%S') MDF_REO = re.compile("(..):(..):(..)\.*(\d+)*") def tdl(tick_date): """ returns a list of tick tuples (cdt, last) for specified day """ fiid = "846900" year = tick_date.strftime("%Y") yyyymmdd = tick_date.strftime("%Y%m%d") filename = "%s.csv" % (fiid) filepath = os.path.join("d:\\rttrd-prd-var\\consors-mdf\\data", year, yyyymmdd, filename) x = [ ] y = [ ] fh = open(filepath, "r") try: prev_last = "" for line in fh: flds = line.split(",") # determine file version if flds[2] == "LAST": last = float(flds[3]) vol = float(flds[4]) else: last = float(flds[4]) vol = 0.0 # skip ticks with same last price if prev_last == last: continue else: prev_last = last # parse time mobj = MDF_REO.match(flds[0]) if mobj is None: raise ValueError("no match for [%s]" % (flds[0],)) (hh, mm, ss, ms) = mobj.groups() if ms: c_time = datetime.time(int(hh), int(mm), int(ss), int(ms) * 1000) else: c_time = datetime.time(int(hh), int(mm), int(ss)) cdt = datetime.datetime.combine(tick_date, c_time) x.append(date2num(cdt)) y.append(last) finally: fh.close() # throw away first line of file (close price from previous day) del x[0] del y[0] return (x, y) class Mmh: """Time series max & min detector.""" def __init__(self, bias): assert(bias > 0) self.bias = bias self.trend = None self.mm0 = None self.mms = [ ] self.mins = [ ] self.maxs = [ ] def __call__(self, tick): """Add extended tick to the max min parser. @param tick: The value of the current tick. @type tick: tuple(cdt, last) @return: 1. Tick if new max min has been detected, 2. None otherwise. """ n, cdt, last = tick res = None # automatic initialisation if self.mm0 is None: # initalise water mark self.mm0 = tick res = self.mm0 self.mins = [(n, cdt, last - 1)] self.maxs = [(n, cdt, last + 1)] else: # initalise trend until price has changed if self.trend is None or self.trend == 0: self.trend = cmp(last, self.mm0[2]) # check for max if self.trend > 0: if last > self.mm0[2]: self.mm0 = tick if last < self.mm0[2] - self.bias: self.mms.insert(0, self.mm0) self.maxs.append(self.mm0) res = self.mm0 # revert trend & water mark self.mm0 = tick self.trend = -1 # check for min if self.trend < 0: if last < self.mm0[2]: self.mm0 = tick if last > self.mm0[2] + self.bias: self.mms.insert(0, self.mm0) self.mins.append(self.mm0) res = self.mm0 # revert trend & water mark self.mm0 = tick self.trend = +1 return res class Main: def __init__(self): LOG.debug("Loading ticks...") self.x, self.y = tdl(datetime.datetime(2009,6,3)) LOG.debug("Ticks loaded.") self.mmh = Mmh(10) self.root = Tk.Tk() self.root.wm_title("Embedding in TK") fig = plt.figure() self.ax1 = fig.add_subplot(111) # ticks # ax2 = fig.add_subplot(312) # gearing # ax3 = fig.add_subplot(313) # cash self.ax1.set_ylabel("ticks") # ax2.set_ylabel("gearing") # ax3.set_ylabel("cash") self.w = 1000 self.bias = 10 xr = self.x[0:self.w] yr = self.y[0:self.w] fit = np.average(yr) self.tl, = self.ax1.plot_date(xr, yr, '-') self.fl, = self.ax1.plot_date(xr, (fit, ) * self.w, 'k--') self.mh, = self.ax1.plot_date(xr, (yr[0], ) * self.w, 'g:') self.ml, = self.ax1.plot_date(xr, (yr[0], ) * self.w, 'r:') major_fmt = mdates.DateFormatter('%H:%M:%S') self.ax1.xaxis.set_major_formatter(major_fmt) self.ax1.format_xdata = mdates.DateFormatter('%H:%M:%S') self.ax1.format_ydata = lambda x: '%1.2f'%x self.ax1.grid(True) # rotates and right aligns the x labels, and moves the bottom of the # axes up to make room for them fig.autofmt_xdate() self.ax1.axis([xr[0], xr[-1]+180./86400., min(yr), max(yr)]) self.canvas = FigureCanvasTkAgg(fig, master=self.root) self.canvas.draw() self.canvas.get_tk_widget().pack(side=Tk.TOP, fill=Tk.BOTH, expand=1) # toolbar = NavigationToolbar2TkAgg( self.canvas, self.root ) # toolbar.update() # self.canvas._tkself.canvas.pack(side=Tk.TOP, fill=Tk.BOTH, expand=1) fr1 = Tk.Frame(master = self.root) bu1 = Tk.Button(master = fr1, text='Quit', command=self.root.quit) bu2 = Tk.Button(master = fr1, text='Stop', command=self.stop) bu3 = Tk.Button(master = fr1, text='Resume', command=self.resume) bu1.pack(side = Tk.RIGHT, padx = 5, pady = 5) bu2.pack(side = Tk.RIGHT, padx = 5, pady = 5) bu3.pack(side = Tk.RIGHT, padx = 5, pady = 5) fr1.pack(side = Tk.BOTTOM) def animate_start(self): warnings.simplefilter("default", np.RankWarning) self.ymin = min(self.y[0:self.w]) self.ymax = max(self.y[0:self.w]) self.low = self.ymin - self.bias self.high = self.ymax + self.bias self.trend = 0 self.i = 0 for n in range(0, self.w): self.mark_low_high(n) self.root.after(500, self.animate_step) def animate_step(self): # prepare timeline window xr = np.array( self.x[self.i:self.i+self.w] ) yr = np.array( self.y[self.i:self.i+self.w] ) # update line self.tl.set_xdata(xr) self.tl.set_ydata(yr) # determine y axis if yr[-1] > self.ymax: self.ymax = yr[-1] if self.ymax - 50 < min(yr): self.ymin = self.ymax - 50 if yr[-1] < self.ymin: self.ymin = yr[-1] if self.ymin + 50 > max(yr): self.ymax = self.ymin + 50 # check self.low self.high and annotate fwd = 2 for n in range(self.i+self.w, self.i+self.w+fwd): self.mark_low_high(n) self.i += fwd # build polynomial fit mms = self.mmh.mms if len(mms) > 1: # mx = [ (x - int(x)) * 86400 for n, x, y in mms[:4] ] # my = [ y for n, x, y in mms[:4] ] mx = [ (x - int(x)) * 86400 for x in xr ] my = yr xre = [ xr[-1] + x/86400. for x in range(1, 181)] xr2 = np.append(xr, xre) # print "mx: ", mx # print "my: ", my polyval = np.polyfit(mx, my, 30) # print "poly1d: ", polyval intx = np.array(xr, dtype = int) sodx = xr - intx sodx *= 86400 s2x = np.append(sodx, range(sodx[-1], sodx[-1]+180)) fit = np.polyval(polyval, s2x) self.fl.set_xdata(xr2) self.fl.set_ydata(fit) maxs = self.mmh.maxs if len(maxs) > 1: n, x1, y1 = maxs[-2] n, x2, y2 = maxs[-1] x3 = xr[-1] polyfit = np.polyfit((x1, x2), (y1, y2), 1) y3 = np.polyval(polyfit, x3) self.mh.set_data((x1, x2, x3), (y1, y2, y3)) mins = self.mmh.mins if len(mins) > 1: n, x1, y1 = mins[-2] n, x2, y2 = mins[-1] x3 = xr[-1] polyfit = np.polyfit((x1, x2), (y1, y2), 1) y3 = np.polyval(polyfit, x3) self.ml.set_data((x1, x2, x3), (y1, y2, y3)) # draw self.ax1.axis([xr[0], xr[-1]+180./86400., self.ymin, self.ymax]) self.canvas.draw() if self.i < len(self.x)-self.w-1: self.after_id = self.root.after(10, self.animate_step) def build_fit(self): pass def mark_low_high(self, n): x = self.x y = self.y rc = self.mmh((n, x[n], y[n])) if rc: nlh, xlh, ylh = rc if self.mmh.trend > 0: # low self.ax1.annotate('low', xy = (x[nlh], y[nlh]), xytext = (x[n], y[nlh]), arrowprops = dict(facecolor = 'red', shrink = 0.05)) # an.set_annotation_clip(True) elif self.mmh.trend < 0: # high self.ax1.annotate('high', xy = (x[nlh], y[nlh]), xytext = (x[n], y[nlh]), arrowprops = dict(facecolor = 'green', shrink = 0.05)) # an.set_annotation_clip(True) def stop(self): if self.after_id: self.root.after_cancel(self.after_id) self.after_id = None def resume(self): if self.after_id is None: self.after_id = self.root.after(10, self.animate_step) def run(self): self.root.after(500, self.animate_start) self.root.mainloop() self.root.destroy() if __name__ == "__main__": app = Main() app.run()