diff --git a/src/SimpleObjectDistribution.py b/src/SimpleObjectDistribution.py new file mode 100644 index 0000000..e78845c --- /dev/null +++ b/src/SimpleObjectDistribution.py @@ -0,0 +1,102 @@ +# Object adaptor for the client +import communication +from socket import * + +class ObjectAdaptor(object): + def __init__(self, ip, port): + self.ip = ip + self.port = port + + def send(self, *args): + communication.send(self.channel, args) + + def receive(self): + return communication.receive(self.channel) + + def remoteInvoke(self, fun, *args): + method = (fun.func_name, ) + args + self.channel = socket(AF_INET, SOCK_STREAM, 0) + self.channel.connect((self.ip, self.port)) + self.send(*method) + result = self.receive() + self.channel.close() + return result[0] + +# Object server for the actual object +import communication +import types +from socket import * + +class ObjectServer(object): + def __init__(self, ip, port): + self.channel = socket(AF_INET, SOCK_STREAM, 0) + self.channel.bind((ip, port)) + self.channel.listen(50) + self.info = self.channel.getsockname() + + def send(self, client, *args): + communication.send(client, args) + + def receive(self, client): + return communication.receive(client) + + def getInfo(self): + return self.info + + def dispatch(self, invoke, client): + dict = self.__class__.__dict__ + method = invoke[0] + if (method in dict.keys() and type(dict[method]) == types.FunctionType): + method = dict[method] + params = invoke[1:] + result = method(self, *params) + self.send(client, result) + + def start(self): + while (1): + client = self.channel.accept()[0] + invoke = self.receive(client) + self.dispatch(invoke, client) + +# Communication code (the import communcation statements) +from socket import htonl, ntohl +import cPickle +import struct + +marshall = cPickle.dumps +unmarshall = cPickle.loads + + +def send(channel, *args): + buf = marshall(args) + value = htonl(len(buf)) + size = struct.pack("L", value) + channel.send(size) + channel.send(buf) + +def receive(channel): + size = struct.calcsize("L") + size = channel.recv(size) + size = ntohl(struct.unpack("L", size)[0]) + buf = "" + while len(buf) < size: + buf = channel.recv(size - len(buf)) + return unmarshall(buf)[0] + +# Echo server sample +class EchoServer(ObjectServer): + def __init__(self, ip, port): + ObjectServer.__init__(self, ip, port) + def echo(self, msg): + return "Message received: %s" % msg +es = EchoServer("127.0.0.1", 10000) +es.start() + +# Echo client sample +class EchoClient(ObjectAdaptor): + def __init__(self, ip, port): + ObjectAdaptor.__init__(self, ip, port) + def echo(self, msg): + return self.remoteInvoke(self.echo, msg) +ec = EchoClient("127.0.0.1", 10000) +print ec.echo("Hello World!") diff --git a/src/bag.py b/src/bag.py new file mode 100644 index 0000000..81101f1 --- /dev/null +++ b/src/bag.py @@ -0,0 +1,96 @@ +from operator import itemgetter +from heapq import nlargest + +class bag(object): + + def __init__(self, iterable=()): + self._data = {} + self._len = 0 + self.update(iterable) + + def update(self, iterable): + if isinstance(iterable, dict): + for elem, n in iterable.iteritems(): + self[elem] += n + else: + for elem in iterable: + self[elem] += 1 + + def __contains__(self, elem): + return elem in self._data + + def __getitem__(self, elem): + return self._data.get(elem, 0) + + def __setitem__(self, elem, n): + self._len += n - self[elem] + self._data[elem] = n + if n == 0: + del self._data[elem] + + def __delitem__(self, elem): + self._len -= self[elem] + del self._data[elem] + + def __len__(self): + assert self._len == sum(self._data.itervalues()) + return self._len + + def __eq__(self, other): + if not isinstance(other, bag): + return False + return self._data == other._data + + def __ne__(self, other): + if not isinstance(other, bag): + return True + return self._data != other._data + + def __hash__(self): + raise TypeError + + def __repr__(self): + return 'bag(%r)' % self._data + + def copy(self): + return self.__class__(self) + + __copy__ = copy # For the copy module + + def __deepcopy__(self, memo): + from copy import deepcopy + result = self.__class__() + memo[id(self)] = result + data = result._data + result._data = deepcopy(self._data, memo) + result._len = self._len + return result + + def __getstate__(self): + return self._data.copy(), self._len + + def __setstate__(self, data): + self._data = data[0].copy() + self._len = data[1] + + def clear(self): + self._data.clear() + self._len = 0 + + def __iter__(self): + for elem, cnt in self._data.iteritems(): + for i in xrange(cnt): + yield elem + + def iterunique(self): + return self._data.iterkeys() + + def itercounts(self): + return self._data.iteritems() + + def mostcommon(self, n=None): + if n is None: + return sorted(self.itercounts(), key=itemgetter(1), reverse=True) + it = enumerate(self.itercounts()) + nl = nlargest(n, ((cnt, i, elem) for (i, (elem, cnt)) in it)) + return [(elem, cnt) for cnt, i, elem in nl] \ No newline at end of file diff --git a/src/enum.py b/src/enum.py new file mode 100644 index 0000000..aafecf4 --- /dev/null +++ b/src/enum.py @@ -0,0 +1,52 @@ +def Enum(*names): + ##assert names, "Empty enums are not supported" # <- Don't like empty enums? Uncomment! + + class EnumClass(object): + __slots__ = names + def __iter__(self): return iter(constants) + def __len__(self): return len(constants) + def __getitem__(self, i): return constants[i] + def __repr__(self): return 'Enum' + str(names) + def __str__(self): return 'enum ' + str(constants) + + class EnumValue(object): + __slots__ = ('__value') + def __init__(self, value): self.__value = value + Value = property(lambda self: self.__value) + EnumType = property(lambda self: EnumType) + def __hash__(self): return hash(self.__value) + def __cmp__(self, other): + # C fans might want to remove the following assertion + # to make all enums comparable by ordinal value {;)) + assert self.EnumType is other.EnumType, "Only values from the same enum are comparable" + return cmp(self.__value, other.__value) + def __invert__(self): return constants[maximum - self.__value] + def __nonzero__(self): return bool(self.__value) + def __repr__(self): return str(names[self.__value]) + + maximum = len(names) - 1 + constants = [None] * len(names) + for i, each in enumerate(names): + val = EnumValue(i) + setattr(EnumClass, each, val) + constants[i] = val + constants = tuple(constants) + EnumType = EnumClass() + return EnumType + + +if __name__ == '__main__': + print '\n*** Enum Demo ***' + print '--- Days of week ---' + Days = Enum('Mo', 'Tu', 'We', 'Th', 'Fr', 'Sa', 'Su') + print Days + print Days.Mo + print Days.Fr + print Days.Mo < Days.Fr + print list(Days) + for each in Days: + print 'Day:', each + print '--- Yes/No ---' + Confirmation = Enum('No', 'Yes') + answer = Confirmation.No + print 'Your answer is not', ~answer \ No newline at end of file diff --git a/src/grabYahooDataMt.py b/src/grabYahooDataMt.py new file mode 100644 index 0000000..ab5ae89 --- /dev/null +++ b/src/grabYahooDataMt.py @@ -0,0 +1,158 @@ +#! /usr/bin/env python +# -*- coding: utf-8 -*- + + +__author__ = 'gian paolo ciceri ' +__version__ = '0.1' +__date__ = '20070401' +__credits__ = "queue and MT code was shamelessly stolen from pycurl example retriever-multi.py" + +# +# Usage: python grabYahooDataMt.py -h +# +# +# for selecting tickers and starting date it uses an input file of this format +# +# like +# ^GSPC 19500103 # S&P 500 +# ^N225 19840104 # Nikkei 225 + +import sys, threading, Queue, datetime +import urllib +from optparse import OptionParser + + +# this thread ask the queue for job and does it! +class WorkerThread(threading.Thread): + def __init__(self, queue): + threading.Thread.__init__(self) + self.queue = queue + + def run(self): + while 1: + try: + # fetch a job from the queue + ticker, fromdate, todate = self.queue.get_nowait() + except Queue.Empty: + raise SystemExit + if ticker[0] == "^": + tick = ticker[1:] + else: + tick = ticker + filename = downloadTo + "%s_%s.csv" % (tick, todate) + fp = open(filename, "wb") + if options.verbose: + print "last date asked:", todate, todate[0:4], todate[4:6], todate[6:8] + print "first date asked:", fromdate, fromdate[0:4], fromdate[4:6], fromdate[6:8] + quote = dict() + quote['s'] = ticker + quote['d'] = str(int(todate[4:6]) - 1) + quote['e'] = str(int(todate[6:8])) + quote['f'] = str(int(todate[0:4])) + quote['g'] = "d" + quote['a'] = str(int(fromdate[4:6]) - 1) + quote['b'] = str(int(fromdate[6:8])) + quote['c'] = str(int(fromdate[0:4])) + #print quote + params = urllib.urlencode(quote) + params += "&ignore=.csv" + + url = "http://ichart.yahoo.com/table.csv?%s" % params + if options.verbose: + print "fetching:", url + try: + f = urllib.urlopen(url) + fp.write(f.read()) + except: + import traceback + traceback.print_exc(file=sys.stderr) + sys.stderr.flush() + fp.close() + if options.verbose: + print url, "...fetched" + else: + sys.stdout.write(".") + sys.stdout.flush() + + + +if __name__ == '__main__': + + # today is + today = datetime.datetime.now().strftime("%Y%m%d") + + # parse arguments + parser = OptionParser() + parser.add_option("-f", "--file", dest="tickerfile", action="store", default = "./tickers.txt", + help="read ticker list from file, it uses ./tickers.txt as default") + parser.add_option("-c", "--concurrent", type="int", dest="connections", default = 10, action="store", + help="# of concurrent connections") + parser.add_option("-d", "--dir", dest="downloadTo", action="store", default = "./rawdata/", + help="save date to this directory, it uses ./rawdata/ as default") + + parser.add_option("-t", "--todate", dest="todate", default = today, action="store", + help="most recent date needed") + parser.add_option("-v", "--verbose", + action="store_true", dest="verbose") + parser.add_option("-q", "--quiet", + action="store_false", dest="verbose") + (options, args) = parser.parse_args() + + + tickerfile = options.tickerfile + downloadTo = options.downloadTo + connections = options.connections + today = options.todate + + + # get input list + try: + tickers = open(tickerfile).readlines() + except: + parser.error("ticker file %s not found" % (tickerfile,)) + raise SystemExit + + + # build a queue with (ticker, fromdate, todate) tuples + queue = Queue.Queue() + for tickerRow in tickers: + #print tickerRow + tickerRow = tickerRow.strip() + if not tickerRow or tickerRow[0] == "#": + continue + tickerSplit = tickerRow.split() + # ticker, fromdate, todate + queue.put((tickerSplit[0], tickerSplit[1], today)) + + + + + # Check args + assert queue.queue, "no Tickers given" + numTickers = len(queue.queue) + connections = min(connections, numTickers) + assert 1 <= connections <= 255, "too much concurrent connections asked" + + + if options.verbose: + print "----- Getting", numTickers, "Tickers using", connections, "simultaneous connections -----" + + + # start a bunch of threads, passing them the queue of jobs to do + threads = [] + for dummy in range(connections): + t = WorkerThread(queue) + t.start() + threads.append(t) + + + # wait for all threads to finish + for thread in threads: + thread.join() + sys.stdout.write("\n") + sys.stdout.flush() + + # tell something to the user before exiting + if options.verbose: + print "all threads are finished - goodbye." + \ No newline at end of file diff --git a/src/heapq.py b/src/heapq.py new file mode 100644 index 0000000..7629327 --- /dev/null +++ b/src/heapq.py @@ -0,0 +1,129 @@ +import heapq + +class Heap(list): + """This is a wrapper class for the heap functions provided by the heapq module """ + __slots__ = () + + def __init__(self, t=[]): + self.extend(t) + self.heapify() + + push = heapq.heappush + popmin = heapq.heappop + replace = heapq.heapreplace + heapify = heapq.heapify + + def pushpop(self, item): + "Push the item onto the heap and then pop the smallest value" + if self and self[0] < item: + return heapq.heapreplace(self, item) + return item + + def __iter__(self): + "Return a destructive iterator over the heap's elements" + try: + while True: + yield self.popmin() + except IndexError: + pass + + def reduce(self, pos, newitem): + "Replace self[pos] with a lower value item and then reheapify" + while pos > 0: + parentpos = (pos - 1) >> 1 + parent = self[parentpos] + if parent <= newitem: + break + self[pos] = parent + pos = parentpos + self[pos] = newitem + + def is_heap(self): + "Return True if the heap has the heap property; False otherwise" + n = len(self) + # The largest index there's any point to looking at + # is the largest with a child index in-range, so must have 2*i + 1 < n, + # or i < (n-1)/2. If n is even = 2*j, this is (2*j-1)/2 = j-1/2 so + # j-1 is the largest, which is n//2 - 1. If n is odd = 2*j+1, this is + # (2*j+1-1)/2 = j so j-1 is the largest, and that's again n//2-1. + try: + for i in xrange(n//2): + if self[i] > self[2*i+1]: return False + if self[i] > self[2*i+2]: return False + except IndexError: + pass + return True + + +def heapsort(seq): + return [x for x in Heap(seq)] + + +if __name__ == '__main__': + from random import randint, shuffle + + # generate a random test case + n = 15 + data = [randint(1,n) for i in xrange(n)] + shuffle(data) + print data + + # test the constructor + heap = Heap(data) + print heap, heap.is_heap() + + # test popmin + sorted = [] + while heap: + sorted.append(heap.popmin()) + + data.sort() + print heap, heap.is_heap() + print data == sorted + + # test 2 + shuffle(data) + print data + + # test push + for item in data: + heap.push(item) + print heap, heap.is_heap() + + # test __iter__ + sorted = [x for x in heap] + + data.sort() + print data == sorted + + # test 3 + shuffle(data) + print data + heap = Heap(data) + print heap, heap.is_heap() + + # test reduce + for i in range(5): + pos = randint(0,n-1) + decr = randint(1,10) + item = heap[pos] - decr + heap.reduce(pos, item) + + # test is_heap + heap = Heap(data) + count = 0 + while 1: + shuffle(heap) + if heap.is_heap(): + print heap + break + else: + count += 1 + print 'It took', count, 'tries to find a heap by chance.' + + print heapsort(data) + + try: + heap.x = 5 + except AttributeError: + print "Can't add attributes." \ No newline at end of file diff --git a/src/proxy_server.py b/src/proxy_server.py new file mode 100644 index 0000000..6a456a9 --- /dev/null +++ b/src/proxy_server.py @@ -0,0 +1,83 @@ +import asynchat +import asyncore +import socket +import string + +class proxy_server (asyncore.dispatcher): + + def __init__ (self, host, port): + asyncore.dispatcher.__init__ (self) + self.create_socket (socket.AF_INET, socket.SOCK_STREAM) + self.set_reuse_addr() + self.there = (host, port) + here = ('', port + 8000) + self.bind (here) + self.listen (5) + + def handle_accept (self): + proxy_receiver (self, self.accept()) + +class proxy_sender (asynchat.async_chat): + + def __init__ (self, receiver, address): + asynchat.async_chat.__init__ (self) + self.receiver = receiver + self.set_terminator (None) + self.create_socket (socket.AF_INET, socket.SOCK_STREAM) + self.buffer = '' + self.set_terminator ('\n') + self.connect (address) + + def handle_connect (self): + print 'Connected' + + def collect_incoming_data (self, data): + self.buffer = self.buffer + data + + def found_terminator (self): + data = self.buffer + self.buffer = '' + print '==> (%d) %s' % (self.id, repr(data)) + self.receiver.push (data + '\n') + + def handle_close (self): + self.receiver.close() + self.close() + +class proxy_receiver (asynchat.async_chat): + + channel_counter = 0 + + def __init__ (self, server, (conn, addr)): + asynchat.async_chat.__init__ (self, conn) + self.set_terminator ('\n') + self.server = server + self.id = self.channel_counter + self.channel_counter = self.channel_counter + 1 + self.sender = proxy_sender (self, server.there) + self.sender.id = self.id + self.buffer = '' + + def collect_incoming_data (self, data): + self.buffer = self.buffer + data + + def found_terminator (self): + data = self.buffer + self.buffer = '' + print '<== (%d) %s' % (self.id, repr(data)) + self.sender.push (data + '\n') + + def handle_close (self): + print 'Closing' + self.sender.close() + self.close() + +if __name__ == '__main__': + import sys + import string + if len(sys.argv) < 3: + print 'Usage: %s ' % sys.argv[0] + else: + ps = proxy_server (sys.argv[1], string.atoi (sys.argv[2])) + asyncore.loop() + \ No newline at end of file