Here are some indicators I made to practice Python and to get used to Quantopian API. I tested them on daily and they seem to be correct when I verify them against Thinkorswim's values. Any advice on how to write better code is greatly appreciated.
from collections import deque
from datetime import timedelta
import numpy as np
import math
R_P = 0
W_L = 200
class AroonIndicator(object):
def __init__(self, length=60):
self.length = length
def get(self, datapanel, context):
if datapanel is not None:
highs = datapanel['high'][context.sid]
lows = datapanel['low'][context.sid]
if len(highs) >= self.length and len(lows) >= self.length:
ui = getMaxValueOffset(highs[-self.length:])
di = getMinValueOffset(lows[-self.length:])
u = (self.length - ui) * 100 / self.length
d = (self.length - di) * 100 / self.length
log.info((ui, di, u, d))
return u, d
class Average(object):
def __init__(self, length=60):
self.length = length
self.values = deque(maxlen=length)
def get(self, value=None):
if value is not None:
self.values.append(value)
return sum(self.values) / self.length if len(self.values) == self.length else None
class AverageDirectionalIndex(object):
def __init__(self, length=60):
self.apdm = Average(length)
self.amdm = Average(length)
self.atr = Average(length)
self.adx = Average(length)
self.length = length
def get(self, datapanel, context):
if datapanel is not None:
highs = datapanel['high'][context.sid]
lows = datapanel['low'][context.sid]
closes = datapanel['price'][context.sid]
if len(highs) > 1 and len(lows) > 1:
hd = highs[-1] - highs[-2]
ld = lows[-2] - lows[-1]
pdm = hd if hd > ld and hd > 0 else 0
mdm = ld if ld > hd and ld > 0 else 0
tr = getTrueRange(highs, lows, closes)
apdm = self.apdm.get(pdm)
amdm = self.amdm.get(mdm)
atr = self.atr.get(tr)
if apdm is not None and amdm is not None and atr is not None:
dip = 100 * apdm / atr
dim = 100 * amdm / atr
dx = 100 * abs(dip - dim) / (dip + dim) if dip + dim > 0 else 0
adx = self.adx.get(dx)
return adx
class AwesomeOscillator(object):
def __init__(self):
self.avg_fast = Average(5)
self.avg_slow = Average(34)
def get(self, datapanel, context):
if datapanel is not None:
highs = datapanel['high'][context.sid]
lows = datapanel['low'][context.sid]
hl2 = getHL2(highs, lows)
avg_fast = self.avg_fast.get(hl2)
avg_slow = self.avg_slow.get(hl2)
return avg_fast - avg_slow if avg_fast is not None and avg_slow is not None else None
class ChaikinMoneyFlow(object):
def __init__(self, length=60):
self.tv = deque(maxlen=length)
self.vol = deque(maxlen=length)
self.length = length
def get(self, datapanel=None, context=None):
if datapanel is not None:
highs = datapanel['high'][context.sid]
lows = datapanel['low'][context.sid]
closes = datapanel['price'][context.sid]
volumes = datapanel['volume'][context.sid]
tv = volumes[-1] if highs[-1] == lows[-1] else (closes[-1] - lows[-1] - (highs[-1] - closes[-1])) / (highs[-1] - lows[-1]) * volumes[-1]
if tv is not None:
self.tv.appendleft(tv)
if volumes[-1] is not None:
self.vol.appendleft(volumes[-1])
if len(self.tv) != self.length or len(self.vol) != self.length:
return None
else:
sc = sum(self.tv)
ft = sum(self.vol)
return 0 if ft == 0 else sc / ft
class DemandIndex(object):
def __init__(self, length=60):
self.length = length
self.wc = deque(maxlen=2)
self.avg_hhlldiff = Average(length)
self.avg_vol = Average(length)
self.bpres = deque(maxlen=2)
self.spres = deque(maxlen=2)
def get(self, datapanel=None, context=None):
if datapanel is not None:
highs = datapanel['high'][context.sid]
lows = datapanel['low'][context.sid]
closes = datapanel['price'][context.sid]
volumes = datapanel['volume'][context.sid]
wc = (highs[-1] + lows[-1] + 2 * closes[-1]) * 0.25;
self.wc.appendleft(wc)
if len(self.wc) == 2:
wcr = (self.wc[0] - self.wc[1]) / getSmaller(self.wc[0], self.wc[1])
hh = highs[-1] if highs[-1] > highs[-2] else highs[-2]
ll = lows[-1] if lows[-1] < lows[-2] else lows[-2]
avg_hhlldiff = self.avg_hhlldiff.get(hh-ll)
avg_vol = self.avg_vol.get(volumes[-1])
if avg_hhlldiff is not None and avg_vol is not None:
cr = 3 * self.wc[0] / avg_hhlldiff * abs(wcr)
vr = volumes[-1] / avg_vol;
vpc = vr / math.exp(getSmaller(88, cr))
if wcr > 0:
bp = vr
sp = vpc
else:
bp = vpc
sp = vr
log.info((wc, wcr, cr, vr, vpc, bp, sp))
if len(self.bpres) and len(self.spres):
log.info(self.bpres)
log.info(self.spres)
self.bpres.appendleft(((self.bpres[0] * (self.length - 1)) + bp) / self.length)
self.spres.appendleft(((self.spres[0] * (self.length - 1)) + sp) / self.length)
if ((((self.spres[1] * (self.length - 1)) + sp) / self.length - ((self.bpres[1] * (self.length - 1)) + bp) / self.length) > 0):
t = -self.bpres[0] / self.spres[0] if self.spres[0] != 0 else -1
else:
t = self.spres[0] / self.bpres[0] if self.bpres[0] != 0 else 1
di = -1 - t if t < 0 else 1 - t
log.info(self.bpres)
log.info(self.spres)
log.info((t, di))
return di
else:
self.bpres.appendleft(0)
self.spres.appendleft(0)
def BalanceOfMarketPower(datapanel, context):
opens = datapanel['open_price'][context.sid]
highs = datapanel['high'][context.sid]
lows = datapanel['low'][context.sid]
closes = datapanel['price'][context.sid]
return (closes[-1] - opens[-1]) / (highs[-1] - lows[-1]) if highs[-1] != lows[-1] else 1;
def getTrueRange(highs, lows, closes):
return max((highs[-1] - lows[-1]), abs(highs[-1] - closes[-2]), abs(lows[-1] - closes[-2]))
def getOHLC4(opens, highs, lows, closes):
return (opens[-1] + highs[-1] + lows[-1] + closes[-1]) / 4
def getHL2(highs, lows):
return (highs[-1] + lows[-1]) / 2
def getMaxValueOffset(data):
maxval = data.max()
for (key, val) in enumerate(data, start=1):
if val == maxval:
#log.info((key,val))
return len(data) - key
def getMinValueOffset(data):
minval = data.min()
for (key, val) in enumerate(data, start=1):
if val == minval:
#log.info((key,val))
return len(data) - key
def getLarger(a, b):
return a if a > b else b
def getSmaller(a, b):
return a if a < b else b
@batch_transform(window_length=W_L, refresh_period=R_P, compute_only_full=False)
def getBatch(datapanel):
return datapanel
def initialize(context):
context.sid = sid(8554)
#context.invested = False
#context.ai = AroonIndicator(25)
#context.adx = AverageDirectionalIndex(14)
#context.ao = AwesomeOscillator()
#context.cmf = ChaikinMoneyFlow(21)
context.di = DemandIndex(5)
def handle_data(context, data):
datapanel = getBatch(data)
#record(adx = context.adx.get(datapanel, context))
#record(ao = context.ao.get(datapanel, context))
#record(cmf = context.cmf.get(datapanel, context))
record(di = context.di.get(datapanel, context))