ADX - Custom factor for pipeline (WIP)
from quantopian.pipeline import Pipeline
from quantopian.pipeline.data.builtin import USEquityPricing
from quantopian.pipeline.filters import StaticAssets
from quantopian.pipeline.factors import CustomFactor
from quantopian.research import run_pipeline
import numpy as np
class ADX(CustomFactor):
inputs = [USEquityPricing.high, USEquityPricing.low, USEquityPricing.close]
params = {'adx_len' : 14}
window_length = 140 # use window_length = 10 * adx_len
def compute(self, today, assets, out, highs, lows, closes, adx_len):
THs = np.maximum(highs[1:], closes[:-1]) # max of current high or previous close
TLs = np.minimum(lows[1:], closes[:-1]) # min of current low or previous close
TRs = THs - TLs
# This comented out code below is an efficient way to calculate ATR without a for loop
# However it produces a single ATR value and not the array of ATR values needed later on
#count = len(TRs)
#decay_rate = (1.0 - (1.0 / (adx_len)))
#weights = np.full(count, decay_rate, float) ** np.arange(count + 1, 1, -1)
#ATR = np.average(TRs, axis=0, weights=weights)
ATRs = np.empty(TRs.shape)
ATRs.fill(np.nan)
for j in range(0, TRs.shape[1]):
for i in range(adx_len, TRs.shape[0]):
if i == adx_len:
ATRs[i,j] = np.average(TRs[1:adx_len,j],axis=0)
else:
ATRs[i,j] = (ATRs[i-1,j]*(adx_len-1)+TRs[i,j])/adx_len
high_diffs = highs[1:]-highs[:-1] # current high - previous high
low_diffs = lows[:-1]-lows[1:] # previous low - current low
pDIs = np.where(((high_diffs > low_diffs) & (high_diffs > 0)), high_diffs, 0.)
nDIs = np.where(((low_diffs > high_diffs) & (low_diffs > 0)), low_diffs, 0.)
# This comented out code below is an efficient way to calculate ApDI and AnDI without a for loop
# However it produces single values and not the array of values needed later on
#ApDI = np.average(pDIs, axis=0, weights=weights)
#AnDI = np.average(nDIs, axis=0, weights=weights)
ApDIs = np.empty(pDIs.shape)
ApDIs.fill(np.nan)
for j in range(0, pDIs.shape[1]):
for i in range(adx_len, pDIs.shape[0]):
if i == adx_len:
ApDIs[i,j] = np.average(pDIs[1:adx_len,j],axis=0)
else:
ApDIs[i,j] = (ApDIs[i-1,j]*(adx_len-1)+pDIs[i,j])/adx_len
AnDIs = np.empty(nDIs.shape)
AnDIs.fill(np.nan)
for j in range(0, nDIs.shape[1]):
for i in range(adx_len, nDIs.shape[0]):
if i == adx_len:
AnDIs[i,j] = np.average(nDIs[1:adx_len,j],axis=0)
else:
AnDIs[i,j] = (AnDIs[i-1,j]*(adx_len-1)+nDIs[i,j])/adx_len
pDMs = 100 * (ApDIs / ATRs)
nDMs = 100 * (AnDIs / ATRs)
DXs = 100 * np.abs(pDMs - nDMs) / (pDMs + nDMs)
# This commented out code below seems like it should have worked
# However it just produce NaN values.
#count = len(DXs)
#decay_rate = (1.0 - (1.0 / (adx_len)))
#weights = np.full(count, decay_rate, float) ** np.arange(count + 1, 1, -1)
#ADX = np.average(DXs, axis=0, weights=weights)
#out[:] = ADX
ADXs = np.empty(DXs.shape)
ADXs.fill(np.nan)
for j in range(0, DXs.shape[1]):
for i in range(2*adx_len, DXs.shape[0]):
if i == 2*adx_len:
ADXs[i,j] = np.average(DXs[adx_len+1:2*adx_len,j],axis=0)
else:
ADXs[i,j] = (ADXs[i-1,j]*(adx_len-1)+DXs[i,j])/adx_len
out[:] = ADXs[-1:]
# Uncomment out[:] statements below to check intermediate calculations
# for example, create excel ADX spreadsheet and check corresponding column calculations
#out[:] = highs[-1]
#out[:] = lows[-1]
#out[:] = closes[-1]
#out[:] = THs[-1]
#out[:] = TLs[-1]
#out[:] = TRs[-1]
#out[:] = TRs.shape[1]
#out[:] = ATRs[-1]
#out[:] = ATRs.shape[0]
#out[:] = high_diffs[-1]
#out[:] = low_diffs[-1]
#out[:] = pDIs[-1]
#out[:] = pDIs.shape[0]
#out[:] = pDIs.shape[1]
#out[:] = nDIs[-1]
#out[:] = nDIs.shape[0]
#out[:] = nDIs.shape[1]
#out[:] = ApDIs[-1]
#out[:] = ApDIs.shape[0]
#out[:] = ApDIs.shape[1]
#out[:] = AnDIs[-1]
#out[:] = pDMs[-1]
#out[:] = pDMs.shape[0]
#out[:] = pDMs.shape[1]
#out[:] = nDMs[-1]
#out[:] = DXs[-1]
#out[:] = DXs.shape[0]
#out[:] = DXs.shape[1]
# problem - The for loops used for Wilder smoothing of ATR, +DI, -DI, and ADX are very slow
def create_pipeline():
# Base universe set to one symbol for easy validation
base_universe = StaticAssets(symbols(['spy']))
# Uncomment the line below to test if it works for multple symbols
#base_universe = StaticAssets(symbols(['spy', 'tlt']))
# Factor of yesterday's close price.
yesterday_close = USEquityPricing.close.latest
# Factor of yesterday's ADX with default length of 7.
yesterday_adx = ADX(mask=base_universe,window_length=70,adx_len=7)
pipe = Pipeline(
columns={
'yesterday_close': yesterday_close,
'yesterday_adx': yesterday_adx,
},
screen=base_universe
)
return pipe
results = run_pipeline(create_pipeline(), '11-26-2018', '11-25-2019')
import pandas as pd
results.index = pd.MultiIndex.droplevel(results.index, level=1)
results.tail(20)
import talib as ta
import matplotlib.pyplot as plt
# Get data for SPY
data = get_pricing('spy', '11-26-2018', '11-25-2019', frequency='daily')
# fields: 'open_price', 'high', 'low', 'close_price'
date = data.index
openp = data['open_price']
closep = data['close_price']
highp = data['high']
lowp = data['low']
# Calculate ADX-7 based on talib
data['ADX_talib'] = ta.ADX(highp.values,lowp.values,closep.values,timeperiod=7)
ADX_talib = data['ADX_talib']
#data = data[1:]
# Shift by one day to align with pipeline and
# offset it vertically by 0.3 so that it is not line on line with ADX from pipeline
ADX_talib_shifted = ADX_talib.shift(1)+0.3
# ADX based on pipeline ADX custom factor from above
ADX_custom_factor = results['yesterday_adx']
# Plot TALIB ADX and compare it to custom factor ADX
plt.figure()
plt.title('SPY ADX-7 Comparison')
ADX_talib_shifted.plot(color='blue', marker='o', linestyle='dashed', linewidth=1)
ADX_custom_factor.plot(color='green', marker='o', linewidth=1)
plt.legend(['ADX, talib (shifted by 1 day)', 'ADX_custom_factor'])
plt.xlim('2019-09-25', '2019-11-25')
plt.show()