# Copyright (c) 2009 Andreas Balogh # See LICENSE for details. """ Online sliding window with trend analysis 1. segment tick data with a sliding window alogrithm 2. recognise low/high points by comparing slope information 3. recognise trend by observing low/high point difference """ # system imports import datetime import os import re import logging import warnings import math import Tkinter as Tk import numpy as np import matplotlib as mpl mpl.use('TkAgg') from matplotlib.backends.backend_tkagg import FigureCanvasTkAgg import matplotlib.pyplot as plt import matplotlib.dates as mdates from matplotlib.dates import date2num # local imports from globals import * # constants ONE_MINUTE = 60. / 86400. LOW, NONE, HIGH = range(-1, 2) # globals LOG = logging.getLogger() logging.basicConfig(level=logging.DEBUG, format='%(asctime)s.%(msecs)03i %(levelname).4s %(process)d:%(thread)d %(message)s', datefmt='%H:%M:%S') MDF_REO = re.compile("(..):(..):(..)\.*(\d+)*") def tdl(tick_date): """ returns a list of tick tuples (cdt, last) for specified day """ fiid = "846900" year = tick_date.strftime("%Y") yyyymmdd = tick_date.strftime("%Y%m%d") filename = "%s.csv" % (fiid) filepath = os.path.join(RTTRD_VAR, "consors-mdf\\data", year, yyyymmdd, filename) x = [ ] y = [ ] v = [ ] fh = open(filepath, "r") try: prev_last = "" for line in fh: flds = line.split(",") # determine file version if flds[2] == "LAST": last = float(flds[3]) vol = float(flds[4]) else: last = float(flds[4]) vol = 0.0 # skip ticks with same last price if prev_last == last: continue else: prev_last = last # parse time mobj = MDF_REO.match(flds[0]) if mobj is None: raise ValueError("no match for [%s]" % (flds[0],)) (hh, mm, ss, ms) = mobj.groups() if ms: c_time = datetime.time(int(hh), int(mm), int(ss), int(ms) * 1000) else: c_time = datetime.time(int(hh), int(mm), int(ss)) cdt = datetime.datetime.combine(tick_date, c_time) x.append(date2num(cdt)) y.append(last) v.append(vol) finally: fh.close() # throw away first line of file (close price from previous day) del x[0] del y[0] del v[0] return (x, y, v) def interpolate_line(xs, ys): """Fit a straight line y = bx + a to a set of points (x, y) """ # from two data points only! x1, x2 = xs y1, y2 = ys try: b = ( y2 - y1 ) / ( x2 - x1 ) except ZeroDivisionError: print "interpolate_line: division by zero, ", x1, x2, y1, y2 b = 0.0 a = y1 - b * x1 return (b, a) def num2sod(x): frac, integ = math.modf(x) return frac * 86400 class TimedLohi: """Time series online low and high detector. Confirms low/high candidates after timeout. Time dependent. """ def __init__(self, bias, timeout = ONE_MINUTE): assert(bias > 0) self.bias = bias self.timeout = timeout self.low0 = None self.high0 = None self.prev_lohi = NONE self.lohis = [ ] self.lows = [ ] self.highs = [ ] def __call__(self, tick): """Add extended tick to the max min parser. @param tick: The value of the current tick. @type tick: tuple(cdt, last) @return: 1. Tick if new max min has been detected, 2. None otherwise. """ n, cdt, last = tick res = None # automatic initialisation if self.low0 is None: self.low0 = tick self.lows.append((n, cdt, last - 1)) if self.high0 is None: self.high0 = tick self.highs.append((n, cdt, last + 1)) if last > self.high0[2]: self.high0 = tick if self.prev_lohi == NONE: if self.high0[2] > self.low0[2] + self.bias: res = self.high0 self.low0 = self.high0 self.lows.append(self.high0) self.lohis.append(self.high0) self.prev_lohi = HIGH if last < self.low0[2]: self.low0 = tick if self.prev_lohi == NONE: if self.low0[2] < self.high0[2] - self.bias: res = self.low0 self.high0 = self.low0 self.lows.append(self.low0) self.lohis.append(self.low0) self.prev_lohi = LOW if self.high0[1] < cdt - self.timeout and \ ((self.prev_lohi == LOW and \ self.high0[2] > self.lows[-1][2] + self.bias) or (self.prev_lohi == HIGH and \ self.high0[2] > self.highs[-1][2])): res = self.high0 self.low0 = self.high0 self.highs.append(self.high0) self.lohis.append(self.high0) self.prev_lohi = HIGH if self.low0[1] < cdt - self.timeout and \ ((self.prev_lohi == LOW and \ self.low0[2] < self.lows[-1][2]) or (self.prev_lohi == HIGH and \ self.low0[2] < self.highs[-1][2] - self.bias)): res = self.low0 self.high0 = self.low0 self.lows.append(self.low0) self.lohis.append(self.low0) self.prev_lohi = LOW if res: return (self.prev_lohi, res) else: return None def find_lows_highs(xs, ys): dacp = DelayedAcp(10) for tick in zip(range(len(xs)), xs, ys): dacp(tick) return dacp.lows, dacp.highs class DelayedAcp: """Time series max & min detector.""" def __init__(self, bias): assert(bias > 0) self.bias = bias self.trend = None self.mm0 = None self.lohis = [ ] self.lows = [ ] self.highs = [ ] def __call__(self, tick): """Add extended tick to the max min parser. @param tick: The value of the current tick. @type tick: tuple(n, cdt, last) @return: 1. Tick if new max min has been detected, 2. None otherwise. """ n, cdt, last = tick res = None # automatic initialisation if self.mm0 is None: # initialise water mark self.mm0 = tick res = self.mm0 self.lows = [(n, cdt, last - 1)] self.highs = [(n, cdt, last + 1)] else: # initialise trend until price has changed if self.trend is None or self.trend == 0: self.trend = cmp(last, self.mm0[2]) # check for max if self.trend > 0: if last > self.mm0[2]: self.mm0 = tick if last < self.mm0[2] - self.bias: self.lohis.append(self.mm0) self.highs.append(self.mm0) res = self.mm0 # revert trend & water mark self.mm0 = tick self.trend = -1 # check for min if self.trend < 0: if last < self.mm0[2]: self.mm0 = tick if last > self.mm0[2] + self.bias: self.lohis.append(self.mm0) self.lows.append(self.mm0) res = self.mm0 # revert trend & water mark self.mm0 = tick self.trend = +1 return (cmp(self.trend, 0), res) class SlidingWindow: """Douglas-Peucker algorithm.""" def __init__(self, bias): assert(bias > 0) self.bias = bias self.xs = [ ] self.ys = [ ] self.segx = [ ] self.segy = [ ] self.types = [ ] self.bs = [ ] def __call__(self, tick): """Add extended tick to the max min parser. @param tick: The value of the current tick. @type tick: tuple(n, cdt, last) @return: 1. Tick if new max min has been detected, 2. None otherwise. """ n, cdt, last = tick max_distance = self.bias rc = None self.xs.append(cdt) self.ys.append(last) x0, y0 = (self.xs[0], self.ys[0]) x1, y1 = (self.xs[-1], self.ys[-1]) if n == 0: self.segx.append(x0) self.segy.append(y0) if len(self.xs) < 2: return None # check distance coefs = interpolate_line((x0, x1), (y0, y1)) ip_ys = np.polyval(coefs, self.xs) d_ys = np.absolute(self.ys - ip_ys) d_max = np.amax(d_ys) if d_max > max_distance: n = np.argmax(d_ys) x2, y2 = (self.xs[n], self.ys[n]) self.segx.append(x2) self.segy.append(y2) segment_added = True # store slope of segment b0, a0 = interpolate_line((x0, x2), (y0, y2)) self.bs.append(b0) # remove ticks of previous segment del self.xs[0:n] del self.ys[0:n] # slope of current segment x0, y0 = (self.xs[0], self.ys[0]) b1, a1 = interpolate_line((x0, x1), (y0, y1)) lohi = self.get_type(b0, b1) rc = (x2, y2, lohi) return (self.segx + [x1], self.segy + [y1], rc) def get_type(self, b0, b1): """ calculate gearing y: previous slope, x: current slope <0 ~0 >0 <0 L L L ~0 H 0 L >0 H H H """ if b0 < -SMALL and b1 < -SMALL and b0 > b1: lohi = "d+" elif b0 < -SMALL and b1 < SMALL and b0 < b1: lohi = "d-" elif b0 < -SMALL and b1 > SMALL: lohi = "L" elif abs(b0) < SMALL and b1 < -SMALL: lohi = "d+" elif abs(b0) < SMALL and abs(b1) < SMALL: lohi = "0" elif abs(b0) < SMALL and b1 > SMALL: lohi = "u+" elif b0 > SMALL and b1 < -SMALL: lohi = "H" elif b0 > SMALL and b1 > -SMALL and b0 > b1: lohi = "u-" elif b0 > SMALL and b1 > SMALL and b0 < b1: lohi = "u+" else: lohi = "?" return lohi SMALL = 1E-10 class Main: def __init__(self): warnings.simplefilter("default", np.RankWarning) self.advance_count = 10 self.ylow = None self.yhigh = None self.segs = [ ] self.root = Tk.Tk() self.root.wm_title("Embedding in TK") # create plot fig = plt.figure() self.ax1 = fig.add_subplot(311) # ticks self.ax2 = fig.add_subplot(312) # slope of line segement self.ax3 = fig.add_subplot(313) # moving average (10min) self.ax1.set_ylabel("ticks") self.ax2.set_ylabel("slope") self.ax3.set_ylabel("gearing") major_fmt = mdates.DateFormatter('%H:%M:%S') self.ax1.xaxis.set_major_formatter(major_fmt) self.ax1.xaxis.set_major_locator(mdates.MinuteLocator(byminute = range(0, 60, 10))) self.ax1.xaxis.set_minor_locator(mdates.MinuteLocator()) self.ax1.format_xdata = major_fmt self.ax1.format_ydata = lambda x: '%1.2f' % x self.ax1.grid(True) self.ax2.xaxis.set_major_formatter(major_fmt) self.ax2.xaxis.set_major_locator(mdates.MinuteLocator(byminute = range(0, 60, 10))) self.ax2.xaxis.set_minor_locator(mdates.MinuteLocator()) self.ax2.format_xdata = major_fmt self.ax2.format_ydata = lambda x: '%1.2f' % x self.ax2.grid(True) self.ax3.xaxis.set_major_formatter(major_fmt) self.ax3.xaxis.set_major_locator(mdates.MinuteLocator(byminute = range(0, 60, 10))) self.ax3.xaxis.set_minor_locator(mdates.MinuteLocator()) self.ax3.format_xdata = major_fmt self.ax3.format_ydata = lambda x: '%1.2f' % x self.ax3.grid(True) # rotates and right aligns the x labels, and moves the bottom of the # axes up to make room for them fig.autofmt_xdate() # create artists LOG.debug("Loading ticks...") self.xs, self.ys, self.vs = tdl(datetime.datetime(2009, 7, 2)) LOG.debug("Ticks loaded.") lows, highs = find_lows_highs(self.xs, self.ys) self.mas = self.ys[:] self.ss = [ 0 ] * len(self.xs) self.gs = [ 0 ] * len(self.xs) self.mmh = TimedLohi(5) self.osw = SlidingWindow(5) self.w0 = 0 self.wd = 2000 self.low_high_crs = 0 xr, yr, vr, sr, gr = self.tick_window(self.w0, self.wd) self.n0 = 0 # top subplot self.tl, = self.ax1.plot_date(xr, yr, '-') self.seg, = self.ax1.plot_date((xr[0], xr[1]), (yr[0], yr[1]), 'k-') # Acp markers self.him, = self.ax1.plot_date([x for n, x, y in lows], [y for n, x, y in lows], 'go') self.lom, = self.ax1.plot_date([x for n, x, y in highs], [y for n, x, y in highs], 'ro') # volume subplot # self.dl, = self.ax2.plot_date(xr, vr, '-') self.dl, = self.ax1.plot_date(xr, vr, 'g-') # slope subplot self.sl, = self.ax2.plot_date(xr, sr, '-') # gearing subplot self.gl, = self.ax3.plot_date(xr, gr, '-') self.set_axis(xr, yr) # embed canvas in Tk self.canvas = FigureCanvasTkAgg(fig, master=self.root) self.canvas.draw() self.canvas.get_tk_widget().pack(side=Tk.TOP, fill=Tk.BOTH, expand=Tk.TRUE) # toolbar = NavigationToolbar2TkAgg( self.canvas, self.root ) # toolbar.update() # self.canvas._tkself.canvas.pack(side=Tk.TOP, fill=Tk.BOTH, expand=1) fr1 = Tk.Frame(master=self.root) bu1 = Tk.Button(master=fr1, text='Quit', command=self.root.quit) bu2 = Tk.Button(master=fr1, text='Stop', command=self.stop) bu4 = Tk.Button(master=fr1, text='1x', command=self.times_one) bu5 = Tk.Button(master=fr1, text='5x', command=self.times_five) bu6 = Tk.Button(master=fr1, text='10x', command=self.times_ten) bu1.pack(side=Tk.RIGHT, padx=5, pady=5) bu6.pack(side=Tk.RIGHT, padx=5, pady=5) bu5.pack(side=Tk.RIGHT, padx=5, pady=5) bu4.pack(side=Tk.RIGHT, padx=5, pady=5) bu2.pack(side=Tk.RIGHT, padx=5, pady=5) fr1.pack(side=Tk.BOTTOM) def animate(self): self.w0 += self.advance_count # prepare timeline window xr, yr, vr, sr, gr = self.tick_window(self.w0, self.wd) while self.low_high_crs < self.w0 + self.wd: # self.mark_low_high(self.low_high_crs) self.mark_segments(self.low_high_crs) self.ma(self.low_high_crs, 10) self.low_high_crs += 1 # update tick line self.tl.set_data(xr, yr) # update segment slope self.sl.set_data(xr, sr) # update volume line self.dl.set_data(xr, vr) # gearing line self.gl.set_data(xr, gr) # update axis self.set_axis(xr, yr) self.canvas.draw() if self.w0 < len(self.xs) - self.wd - 1: self.after_id = self.root.after(10, self.animate) def set_axis(self, xr, yr, bias=50): if self.ylow is None: self.ylow = yr[0] - bias / 2 self.yhigh = yr[0] + bias / 2 for y in yr: if y < self.ylow: self.ylow = y self.yhigh = self.ylow + bias if y > self.yhigh: self.yhigh = y self.ylow = self.yhigh - bias self.ax1.axis([xr[0], xr[-1], self.ylow, self.yhigh]) self.ax2.axis([xr[0], xr[-1], -5, +5]) self.ax3.axis([xr[0], xr[-1], -50, +50]) def tick_window(self, w0, wd = 1000): return (self.xs[w0:w0 + wd], self.ys[w0:w0 + wd], self.mas[w0:w0 + wd], self.ss[w0:w0 + wd], self.gs[w0:w0+wd]) def ma(self, n0, min): self.mas[n0] = np.average(self.ys[n0-min*60:n0]) self.gs[n0] = self.ys[n0] - self.mas[n0] + self.ss[n0] def mark_segments(self, n): x = self.xs y = self.ys rc = self.osw((n, x[n], y[n])) if rc is not None: segx, segy, lohi = rc self.seg.set_data(segx, segy) if lohi is not None: text = lohi[2] if text == "u+": fc = "blue" dy = -15 elif text == "d+": fc = "blue" dy = +15 elif text == "H": fc = "green" dy = +15 elif text == "L": fc = "red" dy = -15 else: fc = None if fc: self.ax1.annotate(text, xy=(lohi[0], lohi[1]), xytext=(segx[-1], segy[-2]+dy), arrowprops=dict(facecolor=fc, frac=0.3, shrink=0.1)) def mark_low_high(self, n): x = self.xs y = self.ys rc = self.mmh((n, x[n], y[n])) if rc: lohi, tick = rc nlh, xlh, ylh = tick if lohi < 0: # low self.ax1.annotate('low', xy=(x[nlh], y[nlh]), xytext=(x[n], y[nlh]), arrowprops=dict(facecolor='red', frac=0.3, shrink=0.1)) elif lohi > 0: # high self.ax1.annotate('high', xy=(x[nlh], y[nlh]), xytext=(x[n], y[nlh]), arrowprops=dict(facecolor='green', frac=0.3, shrink=0.1)) def stop(self): if self.after_id: self.root.after_cancel(self.after_id) self.after_id = None def resume(self): if self.after_id is None: self.after_id = self.root.after(10, self.animate) def times_one(self): self.advance_count = 1 self.resume() def times_five(self): self.advance_count = 5 self.resume() def times_ten(self): self.advance_count = 10 self.resume() def run(self): self.root.after(500, self.animate) self.root.mainloop() self.root.destroy() if __name__ == "__main__": app = Main() app.run()