online sliding window implemented

--HG--
branch : sandbox
This commit is contained in:
Andreas
2009-07-22 21:19:25 +00:00
parent 8d3a83181c
commit 3cd2666eaf
3 changed files with 236 additions and 83 deletions

View File

@@ -1,13 +1,13 @@
# Copyright (c) 2009 Andreas Balogh
# See LICENSE for details.
'''
Online bottom-up
"""
Online sliding window with trend analysis
1. segment tick data with a sliding window alogrithm
2. recognise low/high points by comparing slope information
3. recognise trend by observing low/high point difference
'''
"""
# system imports
@@ -50,7 +50,7 @@ MDF_REO = re.compile("(..):(..):(..)\.*(\d+)*")
def tdl(tick_date):
''' returns a list of tick tuples (cdt, last) for specified day '''
""" returns a list of tick tuples (cdt, last) for specified day """
fiid = "846900"
year = tick_date.strftime("%Y")
yyyymmdd = tick_date.strftime("%Y%m%d")
@@ -98,7 +98,7 @@ def tdl(tick_date):
return (x, y, v)
def interpolate_line(xs, ys):
'''Fit a straight line y = bx + a to a set of points (x, y) '''
"""Fit a straight line y = bx + a to a set of points (x, y) """
# from two data points only!
x1, x2 = xs
y1, y2 = ys
@@ -115,7 +115,7 @@ def num2sod(x):
return frac * 86400
class Lohi:
'''Time series online low and high detector.'''
"""Time series online low and high detector."""
def __init__(self, bias):
assert(bias > 0)
self.bias = bias
@@ -127,14 +127,14 @@ class Lohi:
self.highs = [ ]
def __call__(self, tick):
'''Add extended tick to the max min parser.
"""Add extended tick to the max min parser.
@param tick: The value of the current tick.
@type tick: tuple(cdt, last)
@return: 1. Tick if new max min has been detected,
2. None otherwise.
'''
"""
n, cdt, last = tick
res = None
# automatic initialisation
@@ -196,7 +196,7 @@ def find_lows_highs(xs, ys):
class DelayedAcp:
'''Time series max & min detector.'''
"""Time series max & min detector."""
def __init__(self, bias):
assert(bias > 0)
self.bias = bias
@@ -207,14 +207,14 @@ class DelayedAcp:
self.highs = [ ]
def __call__(self, tick):
'''Add extended tick to the max min parser.
"""Add extended tick to the max min parser.
@param tick: The value of the current tick.
@type tick: tuple(n, cdt, last)
@return: 1. Tick if new max min has been detected,
2. None otherwise.
'''
"""
n, cdt, last = tick
res = None
# automatic initialisation
@@ -253,68 +253,98 @@ class DelayedAcp:
return (cmp(self.trend, 0), res)
class TopDownLoHi:
'''Douglas-Peucker algorithm.'''
class SlidingWindow:
"""Douglas-Peucker algorithm."""
def __init__(self, bias):
assert(bias > 0)
self.bias = bias
self.xs = [ ]
self.ys = [ ]
self.seg0 = 0
self.lohis = [ ]
self.lows = [ ]
self.highs = [ ]
self.segx = [ ]
self.segy = [ ]
self.types = [ ]
self.bs = [ ]
def __call__(self, tick):
'''Add extended tick to the max min parser.
"""Add extended tick to the max min parser.
@param tick: The value of the current tick.
@type tick: tuple(n, cdt, last)
@return: 1. Tick if new max min has been detected,
2. None otherwise.
'''
"""
n, cdt, last = tick
max_distance = self.bias
segment_added = False
self.xs.append(cdt)
self.ys.append(last)
x0, y0 = (self.xs[0], self.ys[0])
x1, y1 = (self.xs[-1], self.ys[-1])
if n == 0:
self.segx.append(x0)
self.segy.append(y0)
if len(self.xs) < 2:
return None
n0 = self.seg0
n1 = len(self.xs)-1
max_distance = self.bias
x0, y0 = (self.xs[n0], self.ys[n0])
x1, y1 = (self.xs[n1], self.ys[n1])
if n1 > n0:
# check distance
coefs = interpolate_line((x0, x1), (y0, y1))
ly2s = np.polyval(coefs, self.xs[n0:n1])
lys = self.ys[n0:n1]
ldiffs = np.absolute(lys - ly2s)
if np.amax(ldiffs) > max_distance:
for n, d in enumerate(ldiffs):
if d > max_distance:
n2 = n0 + n
x2, y2 = (self.xs[n2], self.ys[n2])
self.seg.set_data((x0, x2), (y0, y2))
self.segs.append(self.seg)
# start a new line segment
self.n0 = n2
x0, y0 = (self.xs[n0], self.ys[n0])
coefs = interpolate_line((x0, x1), (y0, y1))
self.seg, = self.ax1.plot_date((x0, x1), (y0, y1), 'k-')
break
return (self.segx, self.segy, segment_added)
# check distance
coefs = interpolate_line((x0, x1), (y0, y1))
ip_ys = np.polyval(coefs, self.xs)
d_ys = np.absolute(self.ys - ip_ys)
d_max = np.amax(d_ys)
if d_max > max_distance:
n = np.argmax(d_ys)
x2, y2 = (self.xs[n], self.ys[n])
self.segx.append(x2)
self.segy.append(y2)
segment_added = True
# store slope of segment
b0, a0 = interpolate_line((x0, x2), (y0, y2))
self.bs.append(b0)
# remove ticks of previous segment
del self.xs[0:n]
del self.ys[0:n]
# slope of current segment
x0, y0 = (self.xs[0], self.ys[0])
b1, a1 = interpolate_line((x0, x1), (y0, y1))
self.add_type(b0, b1)
return (self.segx + [x1], self.segy + [y1], segment_added)
def on_segment(self):
''' calculate gearing
def add_type(self, b0, b1):
""" calculate gearing
y: previous slope, x: current slope
<0 ~0 >0
<0 L L L
~0 H 0 L
>0 H H H
'''
pass
"""
if b0 < -SMALL and b1 < -SMALL and b0 > b1:
type = "H"
elif b0 < -SMALL and b1 < -SMALL and b0 < b1:
type = "L"
elif b0 < -SMALL and abs(b1) < SMALL:
type = "L"
elif b0 < -SMALL and b1 > SMALL:
type = "L"
elif abs(b0) < SMALL and b1 < -SMALL:
type = "H"
elif abs(b0) < SMALL and abs(b1) < SMALL:
type = "0"
elif abs(b0) < SMALL and b1 > SMALL:
type = "L"
elif b0 > SMALL and b1 < -SMALL:
type = "H"
elif b0 > SMALL and abs(b1) < SMALL:
type = "H"
elif b0 > SMALL and b1 > SMALL and b0 > b1:
type = "H"
elif b0 > SMALL and b1 > SMALL and b0 < b1:
type = "L"
else:
type = "?"
self.types.append(type)
SMALL = 1E-10
class Main:
@@ -374,6 +404,7 @@ class Main:
self.gs = [ 0 ] * len(self.xs)
self.mmh = Lohi(5)
self.osw = SlidingWindow(5)
self.w0 = 0
self.wd = 2000
@@ -428,17 +459,12 @@ class Main:
# prepare timeline window
xr, yr, vr, sr, gr = self.tick_window(self.w0, self.wd)
while self.low_high_crs < self.w0 + self.wd:
self.mark_low_high(self.low_high_crs)
self.lin_seg(self.low_high_crs)
# self.mark_low_high(self.low_high_crs)
self.mark_segments(self.low_high_crs)
self.ma(self.low_high_crs, 10)
self.low_high_crs += 1
# update tick line
self.tl.set_data(xr, yr)
# upadte linear segment
n0, n1 = (self.n0, self.low_high_crs)
x0, y0 = (self.xs[n0], self.ys[n0])
x1, y1 = (self.xs[n1], self.ys[n1])
self.seg.set_data((x0, x1), (y0, y1))
# update segment slope
self.sl.set_data(xr, sr)
# update volume line
@@ -477,32 +503,30 @@ class Main:
self.mas[n0] = np.average(self.ys[n0-min*60:n0])
self.gs[n0] = self.ys[n0] - self.mas[n0] + self.ss[n0]
def lin_seg(self, n1):
max_distance = 5
n0 = self.n0
x0, y0 = (self.xs[n0], self.ys[n0])
x1, y1 = (self.xs[n1], self.ys[n1])
self.seg.set_data((x0, x1), (y0, y1))
if n1 > n0:
# check distance
coefs = interpolate_line((x0, x1), (y0, y1))
ly2s = np.polyval(coefs, self.xs[n0:n1])
self.ss[n1] = coefs[0] * ONE_MINUTE
lys = self.ys[n0:n1]
ldiffs = np.absolute(lys - ly2s)
if np.amax(ldiffs) > max_distance:
for n, d in enumerate(ldiffs):
if d > max_distance:
n2 = n0 + n
x2, y2 = (self.xs[n2], self.ys[n2])
self.seg.set_data((x0, x2), (y0, y2))
self.segs.append(self.seg)
# start a new line segment
self.n0 = n2
x0, y0 = (self.xs[n0], self.ys[n0])
coefs = interpolate_line((x0, x1), (y0, y1))
self.seg, = self.ax1.plot_date((x0, x1), (y0, y1), 'k-')
break
def mark_segments(self, n):
x = self.xs
y = self.ys
segx, segy, seg_add = self.osw((n, x[n], y[n]))
self.seg.set_data(segx, segy)
if seg_add:
text = self.osw.types[-1]
if text == "H":
fc = "green"
dy = +15
elif text == "L":
fc = "red"
dy = -15
else:
fc = "blue"
dy = +15
self.ax1.annotate(text,
xy=(segx[-2], segy[-2]),
xytext=(segx[-1], segy[-2]+dy),
arrowprops=dict(facecolor=fc,
frac=0.3,
shrink=0.1))
def mark_low_high(self, n):
x = self.xs