diff --git a/mpl/namedtuple.py b/mpl/namedtuple.py new file mode 100644 index 0000000..ab888eb --- /dev/null +++ b/mpl/namedtuple.py @@ -0,0 +1,40 @@ +# http://code.activestate.com/recipes/500261/ + +from operator import itemgetter +import sys + +def __from_iterable__(cls,arg): + return cls.__new__(cls,*arg) + + +def NamedTuple(typename, field_names): + if isinstance(field_names, str): + field_names = field_names.split() + nargs = len(field_names) + + def __new__(cls, *args, **kwds): + if (len(args) == 1) and (getattr(args[0], '__iter__', False)): + args = tuple(name for name in args[0]) + if kwds: + try: + args += tuple(kwds[name] for name in field_names[len(args):]) + except KeyError, name: + raise TypeError('%s missing required argument: %s' % (typename, name)) + if len(args) != nargs: + raise TypeError('%s takes exactly %d arguments (%d given)' % (typename, nargs, len(args))) + return tuple.__new__(cls, args) + + repr_template = '%s(%s)' % (typename, ', '.join('%s=%%r' % name for name in field_names)) + + m = dict(vars(tuple)) # pre-lookup superclass methods (for faster lookup) + m.update(__doc__= '%s(%s)' % (typename, ', '.join(field_names)), + __slots__ = (), # no per-instance dict (so instances are same size as tuples) + __new__ = __new__, + __repr__ = lambda self, _format=repr_template.__mod__: _format(self), + __module__ = sys._getframe(1).f_globals['__name__'], + __field_names__ = tuple(field_names), + __from_iterable__=classmethod(__from_iterable__), + ) + m.update((name, property(itemgetter(index))) for index, name in enumerate(field_names)) + + return type(typename, (tuple,), m) diff --git a/mpl/sw-trend1.py b/mpl/sw-trend1.py index 0cd8b83..5bcad37 100644 --- a/mpl/sw-trend1.py +++ b/mpl/sw-trend1.py @@ -362,12 +362,12 @@ class Main: # create plot fig = plt.figure() - self.ax1 = fig.add_subplot(311) # ticks - self.ax2 = fig.add_subplot(312) # slope of line segement - self.ax3 = fig.add_subplot(313) # moving average (10min) + self.ax1 = fig.add_subplot(211) # ticks + # self.ax2 = fig.add_subplot(312) # slope of line segement + self.ax3 = fig.add_subplot(212) # moving average (10min) self.ax1.set_ylabel("ticks") - self.ax2.set_ylabel("slope") + # self.ax2.set_ylabel("slope") self.ax3.set_ylabel("gearing") major_fmt = mdates.DateFormatter('%H:%M:%S') @@ -378,12 +378,14 @@ class Main: self.ax1.format_ydata = lambda x: '%1.2f' % x self.ax1.grid(True) + """ self.ax2.xaxis.set_major_formatter(major_fmt) self.ax2.xaxis.set_major_locator(mdates.MinuteLocator(byminute = range(0, 60, 10))) self.ax2.xaxis.set_minor_locator(mdates.MinuteLocator()) self.ax2.format_xdata = major_fmt self.ax2.format_ydata = lambda x: '%1.2f' % x self.ax2.grid(True) + """ self.ax3.xaxis.set_major_formatter(major_fmt) self.ax3.xaxis.set_major_locator(mdates.MinuteLocator(byminute = range(0, 60, 10))) @@ -398,7 +400,7 @@ class Main: # create artists LOG.debug("Loading ticks...") - self.xs, self.ys, self.vs = tdl(datetime.datetime(2009, 7, 2)) + self.xs, self.ys, self.vs = tdl(datetime.datetime(2009, 7, 1)) LOG.debug("Ticks loaded.") lows, highs = find_lows_highs(self.xs, self.ys) self.mas = self.ys[:] @@ -406,7 +408,7 @@ class Main: self.gs = [ 0 ] * len(self.xs) self.mmh = TimedLohi(5) - self.osw = SlidingWindow(5) + self.osw = SlidingWindow(2) self.w0 = 0 self.wd = 2000 @@ -426,7 +428,7 @@ class Main: self.dl, = self.ax1.plot_date(xr, vr, 'g-') # slope subplot - self.sl, = self.ax2.plot_date(xr, sr, '-') + # self.sl, = self.ax2.plot_date(xr, sr, '-') # gearing subplot self.gl, = self.ax3.plot_date(xr, gr, '-') @@ -468,7 +470,7 @@ class Main: # update tick line self.tl.set_data(xr, yr) # update segment slope - self.sl.set_data(xr, sr) + # self.sl.set_data(xr, sr) # update volume line self.dl.set_data(xr, vr) # gearing line @@ -491,7 +493,7 @@ class Main: self.yhigh = y self.ylow = self.yhigh - bias self.ax1.axis([xr[0], xr[-1], self.ylow, self.yhigh]) - self.ax2.axis([xr[0], xr[-1], -5, +5]) + # self.ax2.axis([xr[0], xr[-1], -5, +5]) self.ax3.axis([xr[0], xr[-1], -50, +50]) def tick_window(self, w0, wd = 1000): diff --git a/mpl/sw-trend2.py b/mpl/sw-trend2.py new file mode 100644 index 0000000..3ed7ca0 --- /dev/null +++ b/mpl/sw-trend2.py @@ -0,0 +1,604 @@ +# Copyright (c) 2009 Andreas Balogh +# See LICENSE for details. + +""" +Online sliding window with trend analysis + +1. segment tick data with a sliding window alogrithm +2. recognise low/high points by comparing slope information +3. recognise trend by observing low/high point difference +""" + +# system imports + +import datetime +import os +import re +import logging +import warnings +import math + +import Tkinter as Tk +import numpy as np + +import matplotlib as mpl +mpl.use('TkAgg') + +from matplotlib.backends.backend_tkagg import FigureCanvasTkAgg +import matplotlib.pyplot as plt +import matplotlib.dates as mdates +from matplotlib.dates import date2num + +# local imports + +from namedtuple import NamedTuple +from globals import * + +# constants + +ONE_MINUTE = 60. / 86400. +LOW, NONE, HIGH = range(-1, 2) + +Trend = NamedTuple('Trend', 'n x y') + +# globals + +LOG = logging.getLogger() + +logging.basicConfig(level=logging.DEBUG, + format='%(asctime)s.%(msecs)03i %(levelname).4s %(process)d:%(thread)d %(message)s', + datefmt='%H:%M:%S') + +MDF_REO = re.compile("(..):(..):(..)\.*(\d+)*") + + +def tdl(tick_date): + """ returns a list of tick tuples (cdt, last) for specified day """ + fiid = "846900" + year = tick_date.strftime("%Y") + yyyymmdd = tick_date.strftime("%Y%m%d") + filename = "%s.csv" % (fiid) + filepath = os.path.join(RTTRD_VAR, "consors-mdf\\data", year, yyyymmdd, filename) + x = [ ] + y = [ ] + v = [ ] + fh = open(filepath, "r") + try: + prev_last = "" + for line in fh: + flds = line.split(",") + # determine file version + if flds[2] == "LAST": + last = float(flds[3]) + vol = float(flds[4]) + else: + last = float(flds[4]) + vol = 0.0 + # skip ticks with same last price + if prev_last == last: + continue + else: + prev_last = last + # parse time + mobj = MDF_REO.match(flds[0]) + if mobj is None: + raise ValueError("no match for [%s]" % (flds[0],)) + (hh, mm, ss, ms) = mobj.groups() + if ms: + c_time = datetime.time(int(hh), int(mm), int(ss), int(ms) * 1000) + else: + c_time = datetime.time(int(hh), int(mm), int(ss)) + cdt = datetime.datetime.combine(tick_date, c_time) + x.append(date2num(cdt)) + y.append(last) + v.append(vol) + finally: + fh.close() + # throw away first line of file (close price from previous day) + del x[0] + del y[0] + del v[0] + return (x, y, v) + +def interpolate_line(xs, ys): + """Fit a straight line y = bx + a to a set of points (x, y) """ + # from two data points only! + x1, x2 = xs + y1, y2 = ys + try: + b = ( y2 - y1 ) / ( x2 - x1 ) + except ZeroDivisionError: + print "interpolate_line: division by zero, ", x1, x2, y1, y2 + b = 0.0 + a = y1 - b * x1 + return (b, a) + +def num2sod(x): + frac, integ = math.modf(x) + return frac * 86400 + + +class Bunch: + def __init__(self, **kwds): + self.__dict__.update(kwds) + + +class TimedLohi: + """Time series online low and high detector. + + Confirms low/high candidates after timeout. + Time dependent. + """ + def __init__(self, bias, timeout = ONE_MINUTE): + assert(bias > 0) + self.bias = bias + self.timeout = timeout + self.low0 = None + self.high0 = None + self.prev_lohi = NONE + self.lohis = [ ] + self.lows = [ ] + self.highs = [ ] + + def __call__(self, tick): + """Add extended tick to the max min parser. + + @param tick: The value of the current tick. + @type tick: tuple(cdt, last) + + @return: 1. Tick if new max min has been detected, + 2. None otherwise. + """ + n, cdt, last = tick + res = None + # automatic initialisation + if self.low0 is None: + self.low0 = tick + self.lows.append((n, cdt, last - 1)) + if self.high0 is None: + self.high0 = tick + self.highs.append((n, cdt, last + 1)) + if last > self.high0[2]: + self.high0 = tick + if self.prev_lohi == NONE: + if self.high0[2] > self.low0[2] + self.bias: + res = self.high0 + self.low0 = self.high0 + self.lows.append(self.high0) + self.lohis.append(self.high0) + self.prev_lohi = HIGH + if last < self.low0[2]: + self.low0 = tick + if self.prev_lohi == NONE: + if self.low0[2] < self.high0[2] - self.bias: + res = self.low0 + self.high0 = self.low0 + self.lows.append(self.low0) + self.lohis.append(self.low0) + self.prev_lohi = LOW + if self.high0[1] < cdt - self.timeout and \ + ((self.prev_lohi == LOW and \ + self.high0[2] > self.lows[-1][2] + self.bias) or + (self.prev_lohi == HIGH and \ + self.high0[2] > self.highs[-1][2])): + res = self.high0 + self.low0 = self.high0 + self.highs.append(self.high0) + self.lohis.append(self.high0) + self.prev_lohi = HIGH + if self.low0[1] < cdt - self.timeout and \ + ((self.prev_lohi == LOW and \ + self.low0[2] < self.lows[-1][2]) or + (self.prev_lohi == HIGH and \ + self.low0[2] < self.highs[-1][2] - self.bias)): + res = self.low0 + self.high0 = self.low0 + self.lows.append(self.low0) + self.lohis.append(self.low0) + self.prev_lohi = LOW + if res: + return (self.prev_lohi, res) + else: + return None + + +def find_lows_highs(xs, ys): + dacp = DelayedAcp(10) + for tick in zip(range(len(xs)), xs, ys): + dacp(tick) + return dacp.lows, dacp.highs + + +class DelayedAcp: + """Time series max & min detector.""" + def __init__(self, bias): + assert(bias > 0) + self.bias = bias + self.trend = None + self.mm0 = None + self.lohis = [ ] + self.lows = [ ] + self.highs = [ ] + + def __call__(self, tick): + """Add extended tick to the max min parser. + + @param tick: The value of the current tick. + @type tick: tuple(n, cdt, last) + + @return: 1. Tick if new max min has been detected, + 2. None otherwise. + """ + n, cdt, last = tick + res = None + # automatic initialisation + if self.mm0 is None: + # initialise water mark + self.mm0 = tick + res = self.mm0 + self.lows = [(n, cdt, last - 1)] + self.highs = [(n, cdt, last + 1)] + else: + # initialise trend until price has changed + if self.trend is None or self.trend == 0: + self.trend = cmp(last, self.mm0[2]) + # check for max + if self.trend > 0: + if last > self.mm0[2]: + self.mm0 = tick + if last < self.mm0[2] - self.bias: + self.lohis.append(self.mm0) + self.highs.append(self.mm0) + res = self.mm0 + # revert trend & water mark + self.mm0 = tick + self.trend = -1 + # check for min + if self.trend < 0: + if last < self.mm0[2]: + self.mm0 = tick + if last > self.mm0[2] + self.bias: + self.lohis.append(self.mm0) + self.lows.append(self.mm0) + res = self.mm0 + # revert trend & water mark + self.mm0 = tick + self.trend = +1 + return (cmp(self.trend, 0), res) + + +class SlidingWindow: + """Douglas-Peucker algorithm.""" + def __init__(self, bias): + assert(bias > 0) + self.bias = bias + self.xs = [ ] + self.ys = [ ] + self.segx = [ ] + self.segy = [ ] + self.types = [ ] + self.bs = [ ] + + def __call__(self, tick): + """Add extended tick to the max min parser. + + @param tick: The value of the current tick. + @type tick: tuple(n, cdt, last) + + @return: 1. Tick if new max min has been detected, + 2. None otherwise. + """ + n, cdt, last = tick + max_distance = self.bias + rc = None + self.xs.append(cdt) + self.ys.append(last) + x0, y0 = (self.xs[0], self.ys[0]) + x1, y1 = (self.xs[-1], self.ys[-1]) + if n == 0: + self.segx.append(x0) + self.segy.append(y0) + if len(self.xs) < 2: + return None + # check distance + coefs = interpolate_line((x0, x1), (y0, y1)) + ip_ys = np.polyval(coefs, self.xs) + d_ys = np.absolute(self.ys - ip_ys) + d_max = np.amax(d_ys) + if d_max > max_distance: + n = np.argmax(d_ys) + x2, y2 = (self.xs[n], self.ys[n]) + self.segx.append(x2) + self.segy.append(y2) + segment_added = True + # store slope of segment + b0, a0 = interpolate_line((x0, x2), (y0, y2)) + self.bs.append(b0) + # remove ticks of previous segment + del self.xs[0:n] + del self.ys[0:n] + # slope of current segment + x0, y0 = (self.xs[0], self.ys[0]) + b1, a1 = interpolate_line((x0, x1), (y0, y1)) + lohi = self.get_type(b0, b1) + rc = (x2, y2, lohi) + return (self.segx + [x1], self.segy + [y1], rc) + + def get_type(self, b0, b1): + """ calculate gearing + y: previous slope, x: current slope + <0 ~0 >0 + <0 L L L + ~0 H 0 L + >0 H H H + """ + if b0 < -SMALL and b1 < -SMALL and b0 > b1: + lohi = "d+" + elif b0 < -SMALL and b1 < SMALL and b0 < b1: + lohi = "d-" + elif b0 < -SMALL and b1 > SMALL: + lohi = "L" + elif abs(b0) < SMALL and b1 < -SMALL: + lohi = "d+" + elif abs(b0) < SMALL and abs(b1) < SMALL: + lohi = "0" + elif abs(b0) < SMALL and b1 > SMALL: + lohi = "u+" + elif b0 > SMALL and b1 < -SMALL: + lohi = "H" + elif b0 > SMALL and b1 > -SMALL and b0 > b1: + lohi = "u-" + elif b0 > SMALL and b1 > SMALL and b0 < b1: + lohi = "u+" + else: + lohi = "?" + return lohi + + +SMALL = 1E-10 + + +class Main: + def __init__(self): + warnings.simplefilter("default", np.RankWarning) + self.advance_count = 10 + self.ylow = None + self.yhigh = None + self.trend_starts = None + self.segs = [ ] + + self.root = Tk.Tk() + self.root.wm_title("Embedding in TK") + + # create plot + fig = plt.figure() + self.ax1 = fig.add_subplot(211) # ticks + self.ax2 = fig.add_subplot(212) # moving average (10min) + + self.ax1.set_ylabel("ticks") + self.ax2.set_ylabel("gearing") + + major_fmt = mdates.DateFormatter('%H:%M:%S') + self.ax1.xaxis.set_major_formatter(major_fmt) + self.ax1.xaxis.set_major_locator(mdates.MinuteLocator(byminute = range(0, 60, 10))) + self.ax1.xaxis.set_minor_locator(mdates.MinuteLocator()) + self.ax1.format_xdata = major_fmt + self.ax1.format_ydata = lambda x: '%1.2f' % x + self.ax1.grid(True) + + self.ax2.xaxis.set_major_formatter(major_fmt) + self.ax2.xaxis.set_major_locator(mdates.MinuteLocator(byminute = range(0, 60, 10))) + self.ax2.xaxis.set_minor_locator(mdates.MinuteLocator()) + self.ax2.format_xdata = major_fmt + self.ax2.format_ydata = lambda x: '%1.2f' % x + self.ax2.grid(True) + + # rotates and right aligns the x labels, and moves the bottom of the + # axes up to make room for them + fig.autofmt_xdate() + + # create artists + LOG.debug("Loading ticks...") + self.xs, self.ys, self.vs = tdl(datetime.datetime(2009, 7, 1)) + LOG.debug("Ticks loaded.") + lows, highs = find_lows_highs(self.xs, self.ys) + self.mas = self.ys[:] + + self.mmh = TimedLohi(5) + self.osw = SlidingWindow(2) + + self.w0 = 0 + self.wd = 2000 + self.w_crs = 0 + xr, yr, mar = self.tick_window(self.w0, self.wd) + self.gr = [0.0] * self.wd + + # add artists to top subplot + # tick line and segments + self.tl, = self.ax1.plot_date(xr, yr, '-') + self.seg, = self.ax1.plot_date((xr[0], xr[1]), (yr[0], yr[1]), 'k-') + # Acp markers + self.him, = self.ax1.plot_date([x for n, x, y in lows], [y for n, x, y in lows], 'go') + self.lom, = self.ax1.plot_date([x for n, x, y in highs], [y for n, x, y in highs], 'ro') + # trend lines + self.trd, = self.ax1.plot_date(xr[0:1], yr[0:1], 'k--') + self.trh, = self.ax1.plot_date(xr[0:1], yr[0:1], 'k-') + self.trl, = self.ax1.plot_date(xr[0:1], yr[0:1], 'k-') + + # add artists to bottom subplot + self.gl, = self.ax2.plot_date(xr, self.gr, '-') + + self.set_axis(xr, yr) + + # embed canvas in Tk + self.canvas = FigureCanvasTkAgg(fig, master=self.root) + self.canvas.draw() + self.canvas.get_tk_widget().pack(side=Tk.TOP, fill=Tk.BOTH, expand=Tk.TRUE) + + # toolbar = NavigationToolbar2TkAgg( self.canvas, self.root ) + # toolbar.update() + # self.canvas._tkself.canvas.pack(side=Tk.TOP, fill=Tk.BOTH, expand=1) + + fr1 = Tk.Frame(master=self.root) + bu1 = Tk.Button(master=fr1, text='Quit', command=self.root.quit) + bu2 = Tk.Button(master=fr1, text='Stop', command=self.stop) + bu4 = Tk.Button(master=fr1, text='1x', command=self.times_one) + bu5 = Tk.Button(master=fr1, text='5x', command=self.times_five) + bu6 = Tk.Button(master=fr1, text='10x', command=self.times_ten) + bu1.pack(side=Tk.RIGHT, padx=5, pady=5) + bu6.pack(side=Tk.RIGHT, padx=5, pady=5) + bu5.pack(side=Tk.RIGHT, padx=5, pady=5) + bu4.pack(side=Tk.RIGHT, padx=5, pady=5) + bu2.pack(side=Tk.RIGHT, padx=5, pady=5) + fr1.pack(side=Tk.BOTTOM) + + + def animate(self): + self.w0 += self.advance_count + # prepare timeline window + while self.w_crs < self.w0 + self.wd: + self.ma(self.w_crs, 10) + self.fitter(self.w_crs) + self.w_crs += 1 + xr, yr, mar = self.tick_window(self.w0, self.wd) + # update tick line + self.tl.set_data(xr, yr) + # gearing line + # self.gl.set_data(xr, gr) + # update axis + self.set_axis(xr, yr) + self.canvas.draw() + if self.w0 < len(self.xs) - self.wd - 1: + self.after_id = self.root.after(10, self.animate) + + def set_axis(self, xr, yr, bias=50): + if self.ylow is None: + self.ylow = yr[0] - bias / 2 + self.yhigh = yr[0] + bias / 2 + for y in yr: + if y < self.ylow: + self.ylow = y + self.yhigh = self.ylow + bias + if y > self.yhigh: + self.yhigh = y + self.ylow = self.yhigh - bias + self.ax1.axis([xr[0], xr[-1], self.ylow, self.yhigh]) + self.ax2.axis([xr[0], xr[-1], -50, +50]) + + def tick_window(self, w0, wd = 1000): + return (self.xs[w0:w0 + wd], + self.ys[w0:w0 + wd], + self.mas[w0:w0 + wd], + ) + + def ma(self, n0, min): + self.mas[n0] = np.average(self.ys[n0-min*60:n0]) + + def fitter(self, n0): + # find last low/high within t-1 + # linear regression from t-5 to t-1 + # linear regression within t-1 + # visual inspection + + # determine run-on low and highs + if self.trend_starts is None: + self.trend_starts = [Trend(n=n0, x=self.xs[n0], y=self.ys[n0])] + trend_start = self.trend_starts[-1] + # wait for 30 secs to stabilise + if trend_start.n + 30 > n0: + return + # fit trend + xr = self.xs[trend_start.n:n0] + yr = self.ys[trend_start.n:n0] + ps = np.polyfit(xr, yr, 1) + trend_xs = [xr[0], xr[-1]] + trend_ys = np.polyval(ps, trend_xs) + self.trd.set_data(trend_xs, trend_ys) + # fit counter trend + + + def mark_segments(self, n): + x = self.xs + y = self.ys + rc = self.osw((n, x[n], y[n])) + if rc is not None: + segx, segy, lohi = rc + self.seg.set_data(segx, segy) + if lohi is not None: + text = lohi[2] + if text == "u+": + fc = "blue" + dy = -15 + elif text == "d+": + fc = "blue" + dy = +15 + elif text == "H": + fc = "green" + dy = +15 + elif text == "L": + fc = "red" + dy = -15 + else: + fc = None + if fc: + self.ax1.annotate(text, + xy=(lohi[0], lohi[1]), + xytext=(segx[-1], segy[-2]+dy), + arrowprops=dict(facecolor=fc, + frac=0.3, + shrink=0.1)) + + def mark_low_high(self, n): + x = self.xs + y = self.ys + rc = self.mmh((n, x[n], y[n])) + if rc: + lohi, tick = rc + nlh, xlh, ylh = tick + if lohi < 0: + # low + self.ax1.annotate('low', + xy=(x[nlh], y[nlh]), + xytext=(x[n], y[nlh]), + arrowprops=dict(facecolor='red', + frac=0.3, + shrink=0.1)) + elif lohi > 0: + # high + self.ax1.annotate('high', + xy=(x[nlh], y[nlh]), + xytext=(x[n], y[nlh]), + arrowprops=dict(facecolor='green', + frac=0.3, + shrink=0.1)) + + def stop(self): + if self.after_id: + self.root.after_cancel(self.after_id) + self.after_id = None + + def resume(self): + if self.after_id is None: + self.after_id = self.root.after(10, self.animate) + + def times_one(self): + self.advance_count = 1 + self.resume() + + def times_five(self): + self.advance_count = 5 + self.resume() + + def times_ten(self): + self.advance_count = 10 + self.resume() + + def run(self): + self.root.after(500, self.animate) + self.root.mainloop() + self.root.destroy() + + +if __name__ == "__main__": + app = Main() + app.run()