diff --git a/deploy.cmd b/deploy.cmd new file mode 100644 index 0000000..78efbc3 --- /dev/null +++ b/deploy.cmd @@ -0,0 +1,9 @@ +@echo off +FOR %%f IN (src\*.py) DO CALL :conv %%f +goto :EOF + +:conv +echo "Deploying %1 -> O:\User\baloan\tmp\%~n1" +copy %1 O:\User\baloan\tmp\%~n1 >nul: +c:\apps\tofrodos\fromdos O:\User\baloan\tmp\%~n1 +goto :EOF diff --git a/mpl/sw-trend1.py b/mpl/sw-trend1.py index dc62445..f1f0d52 100644 --- a/mpl/sw-trend1.py +++ b/mpl/sw-trend1.py @@ -1,13 +1,13 @@ # Copyright (c) 2009 Andreas Balogh # See LICENSE for details. -''' -Online bottom-up +""" +Online sliding window with trend analysis 1. segment tick data with a sliding window alogrithm 2. recognise low/high points by comparing slope information 3. recognise trend by observing low/high point difference -''' +""" # system imports @@ -50,7 +50,7 @@ MDF_REO = re.compile("(..):(..):(..)\.*(\d+)*") def tdl(tick_date): - ''' returns a list of tick tuples (cdt, last) for specified day ''' + """ returns a list of tick tuples (cdt, last) for specified day """ fiid = "846900" year = tick_date.strftime("%Y") yyyymmdd = tick_date.strftime("%Y%m%d") @@ -98,7 +98,7 @@ def tdl(tick_date): return (x, y, v) def interpolate_line(xs, ys): - '''Fit a straight line y = bx + a to a set of points (x, y) ''' + """Fit a straight line y = bx + a to a set of points (x, y) """ # from two data points only! x1, x2 = xs y1, y2 = ys @@ -115,7 +115,7 @@ def num2sod(x): return frac * 86400 class Lohi: - '''Time series online low and high detector.''' + """Time series online low and high detector.""" def __init__(self, bias): assert(bias > 0) self.bias = bias @@ -127,14 +127,14 @@ class Lohi: self.highs = [ ] def __call__(self, tick): - '''Add extended tick to the max min parser. + """Add extended tick to the max min parser. @param tick: The value of the current tick. @type tick: tuple(cdt, last) @return: 1. Tick if new max min has been detected, 2. None otherwise. - ''' + """ n, cdt, last = tick res = None # automatic initialisation @@ -196,7 +196,7 @@ def find_lows_highs(xs, ys): class DelayedAcp: - '''Time series max & min detector.''' + """Time series max & min detector.""" def __init__(self, bias): assert(bias > 0) self.bias = bias @@ -207,14 +207,14 @@ class DelayedAcp: self.highs = [ ] def __call__(self, tick): - '''Add extended tick to the max min parser. + """Add extended tick to the max min parser. @param tick: The value of the current tick. @type tick: tuple(n, cdt, last) @return: 1. Tick if new max min has been detected, 2. None otherwise. - ''' + """ n, cdt, last = tick res = None # automatic initialisation @@ -253,68 +253,98 @@ class DelayedAcp: return (cmp(self.trend, 0), res) -class TopDownLoHi: - '''Douglas-Peucker algorithm.''' +class SlidingWindow: + """Douglas-Peucker algorithm.""" def __init__(self, bias): assert(bias > 0) self.bias = bias self.xs = [ ] self.ys = [ ] - self.seg0 = 0 - self.lohis = [ ] - self.lows = [ ] - self.highs = [ ] + self.segx = [ ] + self.segy = [ ] + self.types = [ ] + self.bs = [ ] def __call__(self, tick): - '''Add extended tick to the max min parser. + """Add extended tick to the max min parser. @param tick: The value of the current tick. @type tick: tuple(n, cdt, last) @return: 1. Tick if new max min has been detected, 2. None otherwise. - ''' + """ n, cdt, last = tick + max_distance = self.bias + segment_added = False self.xs.append(cdt) self.ys.append(last) + x0, y0 = (self.xs[0], self.ys[0]) + x1, y1 = (self.xs[-1], self.ys[-1]) + if n == 0: + self.segx.append(x0) + self.segy.append(y0) if len(self.xs) < 2: - return None - n0 = self.seg0 - n1 = len(self.xs)-1 - max_distance = self.bias - x0, y0 = (self.xs[n0], self.ys[n0]) - x1, y1 = (self.xs[n1], self.ys[n1]) - if n1 > n0: - # check distance - coefs = interpolate_line((x0, x1), (y0, y1)) - ly2s = np.polyval(coefs, self.xs[n0:n1]) - lys = self.ys[n0:n1] - ldiffs = np.absolute(lys - ly2s) - if np.amax(ldiffs) > max_distance: - for n, d in enumerate(ldiffs): - if d > max_distance: - n2 = n0 + n - x2, y2 = (self.xs[n2], self.ys[n2]) - self.seg.set_data((x0, x2), (y0, y2)) - self.segs.append(self.seg) - # start a new line segment - self.n0 = n2 - x0, y0 = (self.xs[n0], self.ys[n0]) - coefs = interpolate_line((x0, x1), (y0, y1)) - self.seg, = self.ax1.plot_date((x0, x1), (y0, y1), 'k-') - break - - + return (self.segx, self.segy, segment_added) + # check distance + coefs = interpolate_line((x0, x1), (y0, y1)) + ip_ys = np.polyval(coefs, self.xs) + d_ys = np.absolute(self.ys - ip_ys) + d_max = np.amax(d_ys) + if d_max > max_distance: + n = np.argmax(d_ys) + x2, y2 = (self.xs[n], self.ys[n]) + self.segx.append(x2) + self.segy.append(y2) + segment_added = True + # store slope of segment + b0, a0 = interpolate_line((x0, x2), (y0, y2)) + self.bs.append(b0) + # remove ticks of previous segment + del self.xs[0:n] + del self.ys[0:n] + # slope of current segment + x0, y0 = (self.xs[0], self.ys[0]) + b1, a1 = interpolate_line((x0, x1), (y0, y1)) + self.add_type(b0, b1) + return (self.segx + [x1], self.segy + [y1], segment_added) - def on_segment(self): - ''' calculate gearing + def add_type(self, b0, b1): + """ calculate gearing y: previous slope, x: current slope <0 ~0 >0 <0 L L L ~0 H 0 L >0 H H H - ''' - pass + """ + if b0 < -SMALL and b1 < -SMALL and b0 > b1: + type = "H" + elif b0 < -SMALL and b1 < -SMALL and b0 < b1: + type = "L" + elif b0 < -SMALL and abs(b1) < SMALL: + type = "L" + elif b0 < -SMALL and b1 > SMALL: + type = "L" + elif abs(b0) < SMALL and b1 < -SMALL: + type = "H" + elif abs(b0) < SMALL and abs(b1) < SMALL: + type = "0" + elif abs(b0) < SMALL and b1 > SMALL: + type = "L" + elif b0 > SMALL and b1 < -SMALL: + type = "H" + elif b0 > SMALL and abs(b1) < SMALL: + type = "H" + elif b0 > SMALL and b1 > SMALL and b0 > b1: + type = "H" + elif b0 > SMALL and b1 > SMALL and b0 < b1: + type = "L" + else: + type = "?" + self.types.append(type) + + +SMALL = 1E-10 class Main: @@ -374,6 +404,7 @@ class Main: self.gs = [ 0 ] * len(self.xs) self.mmh = Lohi(5) + self.osw = SlidingWindow(5) self.w0 = 0 self.wd = 2000 @@ -428,17 +459,12 @@ class Main: # prepare timeline window xr, yr, vr, sr, gr = self.tick_window(self.w0, self.wd) while self.low_high_crs < self.w0 + self.wd: - self.mark_low_high(self.low_high_crs) - self.lin_seg(self.low_high_crs) + # self.mark_low_high(self.low_high_crs) + self.mark_segments(self.low_high_crs) self.ma(self.low_high_crs, 10) self.low_high_crs += 1 # update tick line self.tl.set_data(xr, yr) - # upadte linear segment - n0, n1 = (self.n0, self.low_high_crs) - x0, y0 = (self.xs[n0], self.ys[n0]) - x1, y1 = (self.xs[n1], self.ys[n1]) - self.seg.set_data((x0, x1), (y0, y1)) # update segment slope self.sl.set_data(xr, sr) # update volume line @@ -477,32 +503,30 @@ class Main: self.mas[n0] = np.average(self.ys[n0-min*60:n0]) self.gs[n0] = self.ys[n0] - self.mas[n0] + self.ss[n0] - def lin_seg(self, n1): - max_distance = 5 - n0 = self.n0 - x0, y0 = (self.xs[n0], self.ys[n0]) - x1, y1 = (self.xs[n1], self.ys[n1]) - self.seg.set_data((x0, x1), (y0, y1)) - if n1 > n0: - # check distance - coefs = interpolate_line((x0, x1), (y0, y1)) - ly2s = np.polyval(coefs, self.xs[n0:n1]) - self.ss[n1] = coefs[0] * ONE_MINUTE - lys = self.ys[n0:n1] - ldiffs = np.absolute(lys - ly2s) - if np.amax(ldiffs) > max_distance: - for n, d in enumerate(ldiffs): - if d > max_distance: - n2 = n0 + n - x2, y2 = (self.xs[n2], self.ys[n2]) - self.seg.set_data((x0, x2), (y0, y2)) - self.segs.append(self.seg) - # start a new line segment - self.n0 = n2 - x0, y0 = (self.xs[n0], self.ys[n0]) - coefs = interpolate_line((x0, x1), (y0, y1)) - self.seg, = self.ax1.plot_date((x0, x1), (y0, y1), 'k-') - break + def mark_segments(self, n): + x = self.xs + y = self.ys + segx, segy, seg_add = self.osw((n, x[n], y[n])) + self.seg.set_data(segx, segy) + if seg_add: + text = self.osw.types[-1] + if text == "H": + fc = "green" + dy = +15 + elif text == "L": + fc = "red" + dy = -15 + else: + fc = "blue" + dy = +15 + self.ax1.annotate(text, + xy=(segx[-2], segy[-2]), + xytext=(segx[-1], segy[-2]+dy), + arrowprops=dict(facecolor=fc, + frac=0.3, + shrink=0.1)) + + def mark_low_high(self, n): x = self.xs diff --git a/src/main.py b/src/main.py new file mode 100644 index 0000000..0125816 --- /dev/null +++ b/src/main.py @@ -0,0 +1,120 @@ +#!/usr/bin/env python + +# Copyright (c) 2009 Andreas Balogh +# See LICENSE for details. + +""" new module template """ + +# system imports + +import logging +import sys +import os +import getopt + +# local imports + +# constants + +# globals + +LOG = logging.getLogger() + +logging.basicConfig(level=logging.DEBUG, + format="%(asctime)s %(levelname).3s %(process)d:%(thread)d %(message)s", + datefmt="%H:%M:%S") + +# definitions + +class Usage(Exception): + pass + +class Error(Exception): + pass + +def main(argv = [__name__]): + try: + # check for parameters + LOG.debug("starting '%s %s'", argv[0], " ".join(argv[1:])) + script_name = os.path.basename(argv[0]) + try: + opts, args = getopt.getopt(argv[1:], "hfgp", \ + ["help", "force", "gui", "preview"]) + except getopt.error, err: + raise Usage(err) + LOG.debug("opts: %s, args: %s", opts, args) + o_overwrite = False + o_gui = False + o_preview = False + for o, a in opts: + if o in ("-h", "--help"): + usage(script_name) + return 0 + elif o in ("-f", "--force"): + o_overwrite = True + elif o in ("-p", "--preview"): + o_preview = True + elif o in ("-g", "--gui"): + o_gui = True + if len(args) == 2: + src_dir = args[0] + dest_dir = args[1] + elif len(args) == 1 : + src_dir = args[0] + dest_dir = args[0] + elif len(args) == 0 : + src_dir = None + dest_dir = None + o_gui = True + else: + raise Usage("more than two arguments provided") + # call method with appropriate arguments + if src_dir and not os.path.exists(src_dir): + raise Error("Source directory not found [%s], aborting" % (src_dir, )) + if dest_dir and not os.path.exists(dest_dir): + LOG.warn("Destination directory not found [%s]", dest_dir) + if not o_preview: + LOG.info("Creating destination directory [%s]", dest_dir) + os.makedirs(dest_dir) + if o_gui: + gui(src_dir, dest_dir, o_overwrite) + else: + cli(src_dir, dest_dir, o_preview, o_overwrite) + LOG.debug("Done.") + return 0 + except Error, err: + LOG.error(err) + return 1 + except Usage, err: + LOG.error(err) + LOG.info("for usage use -h or --help") + return 2 + + +def gui(src_dir, dest_dir, o_overwrite): + """ graphical user interface """ + print src_dir, dest_dir, o_overwrite + + +def cli(src_dir, dest_dir, o_preview, o_overwrite): + """ command line interface """ + print src_dir, dest_dir, o_preview, o_overwrite + + +def usage(script_name): + print + print "usage: %s [options] [src_dir [dest_dir]]" % (script_name,) + print """ + src_dir source directory to search for MOD/MOI + dest_dir destination directory for MPG files +options: + -h, --help show this help message and exit + -f, --force override files with same name in destination directory + -g, --gui force interactive mode + -p, --preview preview only, don't copy, don't create non-existent directories +""" + + +if __name__ == "__main__": + sys.exit(main(sys.argv)) + \ No newline at end of file