started pattern based analysis

--HG--
branch : sandbox
This commit is contained in:
Andreas
2009-06-28 21:24:39 +00:00
parent d49a1a918f
commit b4890b2241
3 changed files with 578 additions and 14 deletions

View File

@@ -30,6 +30,8 @@ from matplotlib.dates import date2num
# local imports
from globals import *
# constants
ONE_MINUTE = 60. / 86400.
@@ -52,7 +54,7 @@ def tdl(tick_date):
year = tick_date.strftime("%Y")
yyyymmdd = tick_date.strftime("%Y%m%d")
filename = "%s.csv" % (fiid)
filepath = os.path.join("d:\\rttrd-prd-var\\consors-mdf\\data", year, yyyymmdd, filename)
filepath = os.path.join(RTTRD_VAR, "consors-mdf\\data", year, yyyymmdd, filename)
x = [ ]
y = [ ]
v = [ ]
@@ -240,9 +242,12 @@ class DelayedAcp:
class Main:
def __init__(self):
warnings.simplefilter("default", np.RankWarning)
self.advance_count = 1
self.ylow = None
self.yhigh = None
self.advance_count = 1
self.fiblo = None
self.fibhi = None
self.fibs = None
self.root = Tk.Tk()
self.root.wm_title("Embedding in TK")
@@ -280,22 +285,23 @@ class Main:
# create artists
LOG.debug("Loading ticks...")
self.x, self.y, self.v = tdl(datetime.datetime(2009, 6, 3))
self.xs, self.ys, self.vs = tdl(datetime.datetime(2009, 6, 25))
LOG.debug("Ticks loaded.")
lows, highs = find_lows_highs(self.x, self.y)
lows, highs = find_lows_highs(self.xs, self.ys)
self.mmh = Lohi(10)
self.mmh = Lohi(5)
self.w0 = 0
self.wd = 2000
self.low_high_crs = 0
xr, yr, vr = self.tick_window(self.w0, self.wd)
self.fiblo = self.fibhi = (0, self.xs[0], self.ys[0])
fit = np.average(yr)
self.tl, = self.ax1.plot_date(xr, yr, '-')
self.fl, = self.ax1.plot_date(xr, (fit,) * len(xr), 'k--')
self.mh, = self.ax1.plot_date(xr, (yr[0],) * len(xr), 'g:')
self.ml, = self.ax1.plot_date(xr, (yr[0],) * len(xr), 'r:')
self.mh, = self.ax1.plot_date(xr, (yr[0],) * len(xr), 'k-')
self.ml, = self.ax1.plot_date(xr, (yr[0],) * len(xr), 'k-')
# Acp markers
self.him, = self.ax1.plot_date([x for n, x, y in lows], [y for n, x, y in lows], 'go')
self.lom, = self.ax1.plot_date([x for n, x, y in highs], [y for n, x, y in highs], 'ro')
@@ -316,7 +322,6 @@ class Main:
fr1 = Tk.Frame(master=self.root)
bu1 = Tk.Button(master=fr1, text='Quit', command=self.root.quit)
bu2 = Tk.Button(master=fr1, text='Stop', command=self.stop)
bu3 = Tk.Button(master=fr1, text='Resume', command=self.resume)
bu4 = Tk.Button(master=fr1, text='1x', command=self.times_one)
bu5 = Tk.Button(master=fr1, text='5x', command=self.times_five)
bu6 = Tk.Button(master=fr1, text='10x', command=self.times_ten)
@@ -325,7 +330,6 @@ class Main:
bu5.pack(side=Tk.RIGHT, padx=5, pady=5)
bu4.pack(side=Tk.RIGHT, padx=5, pady=5)
bu2.pack(side=Tk.RIGHT, padx=5, pady=5)
bu3.pack(side=Tk.RIGHT, padx=5, pady=5)
fr1.pack(side=Tk.BOTTOM)
@@ -335,6 +339,7 @@ class Main:
xr, yr, vr = self.tick_window(self.w0, self.wd)
while self.low_high_crs < self.w0 + self.wd:
self.mark_low_high(self.low_high_crs)
self.fib_low_high(self.low_high_crs)
self.low_high_crs += 1
# build polynomial fit
lohis = self.mmh.lohis
@@ -347,7 +352,7 @@ class Main:
# width of trend channel
mx = 0
for n in range(n0, n1):
mx = max(mx, math.fabs(np.polyval(coefs, num2sod(self.x[n])) - self.y[n]))
mx = max(mx, math.fabs(np.polyval(coefs, num2sod(self.xs[n])) - self.ys[n]))
a, b = coefs
self.mh.set_data((x0, x2), [np.polyval((a, b+mx), num2sod(x)) for x in (x0, x2)])
self.ml.set_data((x0, x2), [np.polyval((a, b-mx), num2sod(x)) for x in (x0, x2)])
@@ -357,7 +362,7 @@ class Main:
# update axis
self.set_axis(xr, yr)
self.canvas.draw()
if self.w0 < len(self.x) - self.wd - 1:
if self.w0 < len(self.xs) - self.wd - 1:
self.after_id = self.root.after(10, self.animate)
def set_axis(self, xr, yr, bias=50):
@@ -375,11 +380,54 @@ class Main:
self.ax2.axis([xr[0], xr[-1], 0, 50000])
def tick_window(self, w0, wd = 1000):
return (self.x[w0:w0 + wd], self.y[w0:w0 + wd], self.v[w0:w0 + wd])
return (self.xs[w0:w0 + wd], self.ys[w0:w0 + wd], self.vs[w0:w0 + wd])
def fib_low_high(self, n):
tick = (n, self.xs[n], self.ys[n])
redraw = False
n, x, y = tick
hin, hix, hiy = self.fibhi
lon, lox, loy = self.fiblo
delta = hiy - loy
# 61.8, 50.0, 38.2, 23.6 %
y61 = loy + delta * 0.618
y50 = loy + delta * 0.50
y38 = loy + delta * 0.382
y23 = loy + delta * 0.236
if y < self.fiblo[2]:
self.fiblo = tick
if y > self.fibhi[2]:
self.fibhi = tick
if self.fibs is not None:
if lox > hix and y > y50:
self.fibs = None
self.fibhi = tick
if lox < hix and y < y50:
self.fibs = None
self.fiblo = tick
# create fib lines if lo hi differs more than 10 pts
if delta > 10:
xr = (min(lox, hix), x)
if self.fibs is None:
l100, = self.ax1.plot_date(xr, (hiy, hiy), 'r-')
l61, = self.ax1.plot_date(xr, (y61, y61), 'r--')
l50, = self.ax1.plot_date(xr, (y50, y50), 'r--')
l38, = self.ax1.plot_date(xr, (y38, y38), 'r--')
l23, = self.ax1.plot_date(xr, (y23, y23), 'r--')
l0, = self.ax1.plot_date(xr, (loy, loy), 'r-')
self.fibs = (l100, l61, l50, l38, l23, l0)
else:
l100, l61, l50, l38, l23, l0 = self.fibs
l100.set_data(xr, (hiy, hiy))
l61.set_data(xr, (y61, y61))
l50.set_data(xr, (y50, y50))
l38.set_data(xr, (y38, y38))
l23.set_data(xr, (y23, y23))
l0.set_data(xr, (loy, loy))
def mark_low_high(self, n):
x = self.x
y = self.y
x = self.xs
y = self.ys
rc = self.mmh((n, x[n], y[n]))
if rc:
lohi, tick = rc
@@ -412,12 +460,15 @@ class Main:
def times_one(self):
self.advance_count = 1
self.resume()
def times_five(self):
self.advance_count = 5
self.resume()
def times_ten(self):
self.advance_count = 10
self.resume()
def run(self):
self.root.after(500, self.animate)

18
mpl/globals.py Normal file
View File

@@ -0,0 +1,18 @@
# Copyright (c) 2009 Andreas Balogh
# See LICENSE for details.
""" Globals
Global variables
Note: this module implements the singleton pattern.
"""
# system imports
# local imports
# constants
RTTRD_VAR = "d:\\rttrd-prd-var"
PAD_DATA = "d:\\rttrd-dev-var\\analysis\\data"

495
mpl/pad1.py Normal file
View File

@@ -0,0 +1,495 @@
# Copyright (c) 2008 Andreas Balogh
# See LICENSE for details.
""" patterns and distance
A. collect data
1. one tick every minute 10-20 mins back
2. use LoHi max until market close as performance indicator
B. cluster and analyse data according to distance
1. find clusters with net positive P&L. many clusters will exhibit useless patterns
C. check performance with backtest
"""
# system imports
import datetime
import os
import re
import logging
import warnings
import math
import Tkinter as Tk
import numpy as np
import matplotlib
matplotlib.use('TkAgg')
from matplotlib.backends.backend_tkagg import FigureCanvasTkAgg
import matplotlib.pyplot as plt
import matplotlib.dates as mdates
from matplotlib.dates import date2num
# local imports
from globals import *
# constants
ONE_MINUTE = 60. / 86400.
LOW, NONE, HIGH = range(-1, 2)
# globals
LOG = logging.getLogger()
logging.basicConfig(level=logging.DEBUG,
format='%(asctime)s.%(msecs)03i %(levelname).4s %(process)d:%(thread)d %(message)s',
datefmt='%H:%M:%S')
MDF_REO = re.compile("(..):(..):(..)\.*(\d+)*")
def tdl(tick_date):
""" returns a list of tick tuples (cdt, last) for specified day """
fiid = "846900"
year = tick_date.strftime("%Y")
yyyymmdd = tick_date.strftime("%Y%m%d")
filename = "%s.csv" % (fiid)
filepath = os.path.join(RTTRD_VAR, "consors-mdf\\data", year, yyyymmdd, filename)
x = [ ]
y = [ ]
v = [ ]
fh = open(filepath, "r")
try:
prev_last = ""
for line in fh:
flds = line.split(",")
# determine file version
if flds[2] == "LAST":
last = float(flds[3])
vol = float(flds[4])
else:
last = float(flds[4])
vol = 0.0
# skip ticks with same last price
if prev_last == last:
continue
else:
prev_last = last
# parse time
mobj = MDF_REO.match(flds[0])
if mobj is None:
raise ValueError("no match for [%s]" % (flds[0],))
(hh, mm, ss, ms) = mobj.groups()
if ms:
c_time = datetime.time(int(hh), int(mm), int(ss), int(ms) * 1000)
else:
c_time = datetime.time(int(hh), int(mm), int(ss))
cdt = datetime.datetime.combine(tick_date, c_time)
x.append(date2num(cdt))
y.append(last)
v.append(vol)
finally:
fh.close()
# throw away first line of file (close price from previous day)
del x[0]
del y[0]
del v[0]
return (x, y, v)
def num2sod(x):
frac, integ = math.modf(x)
return frac * 86400
class Lohi:
"""Time series online low and high detector."""
def __init__(self, bias):
assert(bias > 0)
self.bias = bias
self.low0 = None
self.high0 = None
self.prev_lohi = NONE
self.lohis = [ ]
self.lows = [ ]
self.highs = [ ]
def __call__(self, tick):
"""Add extended tick to the max min parser.
@param tick: The value of the current tick.
@type tick: tuple(cdt, last)
@return: 1. Tick if new max min has been detected,
2. None otherwise.
"""
n, cdt, last = tick
res = None
# automatic initialisation
if self.low0 is None:
self.low0 = tick
self.lows.append((n, cdt, last - 1))
if self.high0 is None:
self.high0 = tick
self.highs.append((n, cdt, last + 1))
if last > self.high0[2]:
self.high0 = tick
if self.prev_lohi == NONE:
if self.high0[2] > self.low0[2] + self.bias:
res = self.high0
self.low0 = self.high0
self.lows.append(self.high0)
self.lohis.append(self.high0)
self.prev_lohi = HIGH
if last < self.low0[2]:
self.low0 = tick
if self.prev_lohi == NONE:
if self.low0[2] < self.high0[2] - self.bias:
res = self.low0
self.high0 = self.low0
self.lows.append(self.low0)
self.lohis.append(self.low0)
self.prev_lohi = LOW
if self.high0[1] < cdt - ONE_MINUTE and \
((self.prev_lohi == LOW and \
self.high0[2] > self.lows[-1][2] + self.bias) or
(self.prev_lohi == HIGH and \
self.high0[2] > self.highs[-1][2])):
res = self.high0
self.low0 = self.high0
self.highs.append(self.high0)
self.lohis.append(self.high0)
self.prev_lohi = HIGH
if self.low0[1] < cdt - ONE_MINUTE and \
((self.prev_lohi == LOW and \
self.low0[2] < self.lows[-1][2]) or
(self.prev_lohi == HIGH and \
self.low0[2] < self.highs[-1][2] - self.bias)):
res = self.low0
self.high0 = self.low0
self.lows.append(self.low0)
self.lohis.append(self.low0)
self.prev_lohi = LOW
if res:
return (self.prev_lohi, res)
else:
return None
def find_lows_highs(xs, ys):
dacp = DelayedAcp(10)
for tick in zip(range(len(xs)), xs, ys):
dacp(tick)
return dacp.lows, dacp.highs
class DelayedAcp:
"""Time series max & min detector."""
def __init__(self, bias):
assert(bias > 0)
self.bias = bias
self.trend = None
self.mm0 = None
self.lohis = [ ]
self.lows = [ ]
self.highs = [ ]
def __call__(self, tick):
"""Add extended tick to the max min parser.
@param tick: The value of the current tick.
@type tick: tuple(n, cdt, last)
@return: 1. Tick if new max min has been detected,
2. None otherwise.
"""
n, cdt, last = tick
res = None
# automatic initialisation
if self.mm0 is None:
# initialise water mark
self.mm0 = tick
res = self.mm0
self.lows = [(n, cdt, last - 1)]
self.highs = [(n, cdt, last + 1)]
else:
# initialise trend until price has changed
if self.trend is None or self.trend == 0:
self.trend = cmp(last, self.mm0[2])
# check for max
if self.trend > 0:
if last > self.mm0[2]:
self.mm0 = tick
if last < self.mm0[2] - self.bias:
self.lohis.append(self.mm0)
self.highs.append(self.mm0)
res = self.mm0
# revert trend & water mark
self.mm0 = tick
self.trend = -1
# check for min
if self.trend < 0:
if last < self.mm0[2]:
self.mm0 = tick
if last > self.mm0[2] + self.bias:
self.lohis.append(self.mm0)
self.lows.append(self.mm0)
res = self.mm0
# revert trend & water mark
self.mm0 = tick
self.trend = +1
return (cmp(self.trend, 0), res)
def harvest_patterns():
pass
def analyse_patterns():
pass
class Main:
def __init__(self):
warnings.simplefilter("default", np.RankWarning)
self.advance_count = 1
self.ylow = None
self.yhigh = None
self.fiblo = None
self.fibhi = None
self.fibs = None
self.root = Tk.Tk()
self.root.wm_title("Embedding in TK")
# create plot
fig = plt.figure()
self.ax1 = fig.add_subplot(211) # ticks
self.ax2 = fig.add_subplot(212) # volume
# ax3 = fig.add_subplot(313) # cash
self.ax1.set_ylabel("ticks")
self.ax2.set_ylabel("volume")
# ax3.set_ylabel("cash")
major_fmt = mdates.DateFormatter('%H:%M:%S')
major_loc = mdates.MinuteLocator(byminute = range(0, 60, 10))
minor_loc = mdates.MinuteLocator()
self.ax1.xaxis.set_major_formatter(major_fmt)
self.ax1.xaxis.set_major_locator(major_loc)
self.ax1.xaxis.set_minor_locator(minor_loc)
self.ax1.format_xdata = major_fmt
self.ax1.format_ydata = lambda x: '%1.2f' % x
self.ax1.grid(True)
self.ax2.xaxis.set_major_formatter(major_fmt)
self.ax2.xaxis.set_major_locator(mdates.MinuteLocator(byminute = range(0, 60, 10)))
self.ax2.xaxis.set_minor_locator(mdates.MinuteLocator())
self.ax2.format_xdata = major_fmt
self.ax2.format_ydata = lambda x: '%1.2f' % x
self.ax2.grid(True)
# rotates and right aligns the x labels, and moves the bottom of the
# axes up to make room for them
fig.autofmt_xdate()
# create artists
LOG.debug("Loading ticks...")
self.xs, self.ys, self.vs = tdl(datetime.datetime(2009, 6, 25))
LOG.debug("Ticks loaded.")
lows, highs = find_lows_highs(self.xs, self.ys)
self.mmh = Lohi(5)
self.w0 = 0
self.wd = 2000
self.low_high_crs = 0
xr, yr, vr = self.tick_window(self.w0, self.wd)
self.fiblo = self.fibhi = (0, self.xs[0], self.ys[0])
fit = np.average(yr)
self.tl, = self.ax1.plot_date(xr, yr, '-')
self.fl, = self.ax1.plot_date(xr, (fit,) * len(xr), 'k--')
self.mh, = self.ax1.plot_date(xr, (yr[0],) * len(xr), 'k-')
self.ml, = self.ax1.plot_date(xr, (yr[0],) * len(xr), 'k-')
# Acp markers
self.him, = self.ax1.plot_date([x for n, x, y in lows], [y for n, x, y in lows], 'go')
self.lom, = self.ax1.plot_date([x for n, x, y in highs], [y for n, x, y in highs], 'ro')
self.dl, = self.ax2.plot_date(xr, vr, '-')
self.set_axis(xr, yr)
# embed canvas in Tk
self.canvas = FigureCanvasTkAgg(fig, master=self.root)
self.canvas.draw()
self.canvas.get_tk_widget().pack(side=Tk.TOP, fill=Tk.BOTH, expand=Tk.TRUE)
# toolbar = NavigationToolbar2TkAgg( self.canvas, self.root )
# toolbar.update()
# self.canvas._tkself.canvas.pack(side=Tk.TOP, fill=Tk.BOTH, expand=1)
fr1 = Tk.Frame(master=self.root)
bu1 = Tk.Button(master=fr1, text='Quit', command=self.root.quit)
bu2 = Tk.Button(master=fr1, text='Stop', command=self.stop)
bu4 = Tk.Button(master=fr1, text='1x', command=self.times_one)
bu5 = Tk.Button(master=fr1, text='5x', command=self.times_five)
bu6 = Tk.Button(master=fr1, text='10x', command=self.times_ten)
bu1.pack(side=Tk.RIGHT, padx=5, pady=5)
bu6.pack(side=Tk.RIGHT, padx=5, pady=5)
bu5.pack(side=Tk.RIGHT, padx=5, pady=5)
bu4.pack(side=Tk.RIGHT, padx=5, pady=5)
bu2.pack(side=Tk.RIGHT, padx=5, pady=5)
fr1.pack(side=Tk.BOTTOM)
def animate(self):
self.w0 += self.advance_count
# prepare timeline window
xr, yr, vr = self.tick_window(self.w0, self.wd)
while self.low_high_crs < self.w0 + self.wd:
self.mark_low_high(self.low_high_crs)
self.fib_low_high(self.low_high_crs)
self.low_high_crs += 1
# build polynomial fit
lohis = self.mmh.lohis
if len(lohis) >= 4:
n0, x0, y0 = lohis[-4]
n1, x1, y1 = lohis[-1]
x2 = xr[-1]
coefs = np.polyfit([num2sod(x) for n, x, y in lohis[-4:]], [y for n, x, y in lohis[-4:]], 1)
self.fl.set_data((x0, x2), [np.polyval(coefs, num2sod(x)) for x in (x0, x2)])
# width of trend channel
mx = 0
for n in range(n0, n1):
mx = max(mx, math.fabs(np.polyval(coefs, num2sod(self.xs[n])) - self.ys[n]))
a, b = coefs
self.mh.set_data((x0, x2), [np.polyval((a, b+mx), num2sod(x)) for x in (x0, x2)])
self.ml.set_data((x0, x2), [np.polyval((a, b-mx), num2sod(x)) for x in (x0, x2)])
# update tick line
self.tl.set_data(xr, yr)
self.dl.set_data(xr, vr)
# update axis
self.set_axis(xr, yr)
self.canvas.draw()
if self.w0 < len(self.xs) - self.wd - 1:
self.after_id = self.root.after(10, self.animate)
def set_axis(self, xr, yr, bias=50):
if self.ylow is None:
self.ylow = yr[0] - bias / 2
self.yhigh = yr[0] + bias / 2
for y in yr:
if y < self.ylow:
self.ylow = y
self.yhigh = self.ylow + bias
if y > self.yhigh:
self.yhigh = y
self.ylow = self.yhigh - bias
self.ax1.axis([xr[0], xr[-1], self.ylow, self.yhigh])
self.ax2.axis([xr[0], xr[-1], 0, 50000])
def tick_window(self, w0, wd = 1000):
return (self.xs[w0:w0 + wd], self.ys[w0:w0 + wd], self.vs[w0:w0 + wd])
def fib_low_high(self, n):
tick = (n, self.xs[n], self.ys[n])
redraw = False
n, x, y = tick
hin, hix, hiy = self.fibhi
lon, lox, loy = self.fiblo
delta = hiy - loy
# 61.8, 50.0, 38.2, 23.6 %
y61 = loy + delta * 0.618
y50 = loy + delta * 0.50
y38 = loy + delta * 0.382
y23 = loy + delta * 0.236
if y < self.fiblo[2]:
self.fiblo = tick
if y > self.fibhi[2]:
self.fibhi = tick
if self.fibs is not None:
if lox > hix and y > y50:
self.fibs = None
self.fibhi = tick
if lox < hix and y < y50:
self.fibs = None
self.fiblo = tick
# create fib lines if lo hi differs more than 10 pts
if delta > 10:
xr = (min(lox, hix), x)
if self.fibs is None:
l100, = self.ax1.plot_date(xr, (hiy, hiy), 'r-')
l61, = self.ax1.plot_date(xr, (y61, y61), 'r--')
l50, = self.ax1.plot_date(xr, (y50, y50), 'r--')
l38, = self.ax1.plot_date(xr, (y38, y38), 'r--')
l23, = self.ax1.plot_date(xr, (y23, y23), 'r--')
l0, = self.ax1.plot_date(xr, (loy, loy), 'r-')
self.fibs = (l100, l61, l50, l38, l23, l0)
else:
l100, l61, l50, l38, l23, l0 = self.fibs
l100.set_data(xr, (hiy, hiy))
l61.set_data(xr, (y61, y61))
l50.set_data(xr, (y50, y50))
l38.set_data(xr, (y38, y38))
l23.set_data(xr, (y23, y23))
l0.set_data(xr, (loy, loy))
def mark_low_high(self, n):
x = self.xs
y = self.ys
rc = self.mmh((n, x[n], y[n]))
if rc:
lohi, tick = rc
nlh, xlh, ylh = tick
if lohi < 0:
# low
self.ax1.annotate('low',
xy=(x[nlh], y[nlh]),
xytext=(x[n], y[nlh]),
arrowprops=dict(facecolor='red',
frac=0.3,
shrink=0.1))
elif lohi > 0:
# high
self.ax1.annotate('high',
xy=(x[nlh], y[nlh]),
xytext=(x[n], y[nlh]),
arrowprops=dict(facecolor='green',
frac=0.3,
shrink=0.1))
def stop(self):
if self.after_id:
self.root.after_cancel(self.after_id)
self.after_id = None
def resume(self):
if self.after_id is None:
self.after_id = self.root.after(10, self.animate)
def times_one(self):
self.advance_count = 1
self.resume()
def times_five(self):
self.advance_count = 5
self.resume()
def times_ten(self):
self.advance_count = 10
self.resume()
def run(self):
self.root.after(500, self.animate)
self.root.mainloop()
self.root.destroy()
if __name__ == "__main__":
app = Main()
app.run()