10))))$
delta(close, 1) : (-1 * delta(close, 1))))$
Imports
from quantopian.research import run_pipeline
from quantopian.pipeline import Pipeline
from quantopian.pipeline.factors import Latest
from quantopian.pipeline.data.builtin import USEquityPricing
from quantopian.pipeline.data import morningstar
from quantopian.pipeline.factors import CustomFactor, SimpleMovingAverage, AverageDollarVolume, Returns, RSI
from quantopian.pipeline.classifiers.morningstar import Sector
from quantopian.pipeline.filters import Q500US, Q1500US
from quantopian.pipeline.data.quandl import fred_usdontd156n as libor
from quantopian.pipeline.data.zacks import EarningsSurprises
from quantopian.pipeline import CustomFilter
from quantopian.pipeline.factors import VWAP
import statsmodels.api as sm
import talib
import pandas as pd
import numpy as np
from time import time
import alphalens as al
import pyfolio as pf
from scipy import stats
import matplotlib.pyplot as plt
from sklearn import linear_model, decomposition, ensemble, preprocessing, isotonic, metrics
Run pipeline in chucks
def run_pipeline_chunks(pipe, start_date, end_date, chunks_len = None):
"""
Drop-in replacement for run_pipeline.
run_pipeline fails over a very long period of time (memery usage),
so we need to split in chunks the pipeline and concatenate the results
"""
chunks = []
current = pd.Timestamp(start_date)
end = pd.Timestamp(end_date)
step = pd.Timedelta(weeks=26) if chunks_len is None else chunks_len
start_pipeline_timer = time()
while current <= end:
current_end = current + step
if current_end > end:
current_end = end
start_timer = time()
print 'Running pipeline:', current, ' - ', current_end
results = run_pipeline(pipe, current.strftime("%Y-%m-%d"), current_end.strftime("%Y-%m-%d"))
chunks.append(results)
# pipeline returns more days than requested (if no trading day), so get last date from the results
current_end = results.index.get_level_values(0)[-1].tz_localize(None)
current = current_end + pd.Timedelta(days=1)
end_timer = time()
print "Time to run this chunk of the pipeline %.2f secs" % (end_timer - start_timer)
end_pipeline_timer = time()
print "Time to run the entire pipeline %.2f secs" % (end_pipeline_timer - start_pipeline_timer)
return pd.concat(chunks)
Sector Codes
MORNINGSTAR_SECTOR_CODES = {
-1: 'Misc',
101: 'Basic Materials',
102: 'Consumer Cyclical',
103: 'Financial Services',
104: 'Real Estate',
205: 'Consumer Defensive',
206: 'Healthcare',
207: 'Utilities',
308: 'Communication Services',
309: 'Energy',
310: 'Industrials',
311: 'Technology' ,
}
Make factors will create the Alphas
def make_factors():
class Alpha5(CustomFactor):
vwap_in = VWAP(window_length=2)
vwap_in.window_safe = True
inputs = [USEquityPricing.close, USEquityPricing.open, vwap_in]
window_length = 10
def compute(self, today, assets, out, close, open, vwap):
v000 = open[-1]
v00100 = np.empty((10, out.shape[0]))
for i0 in range(1, 11):
v00100[-i0] = vwap[-i0]
v0010 = v00100.sum(axis=0)
v0011 = np.full(out.shape[0], 10.0)
v001 = v0010 / v0011
v00 = v000 - v001
v0 = stats.rankdata(v00)
v10 = np.full(out.shape[0], -1.0)
v11000 = close[-1]
v11001 = vwap[-1]
v1100 = v11000 - v11001
v110 = stats.rankdata(v1100)
v11 = np.abs(v110)
v1 = v10 * v11
out[:] = v0 * v1
class Alpha8(CustomFactor):
inputs = [Returns(window_length=2), USEquityPricing.open]
window_length = 16
def compute(self, today, assets, out, returns, open):
v0 = np.full(out.shape[0], -1.0)
v10000 = np.empty((5, out.shape[0]))
for i0 in range(1, 6):
v10000[-i0] = open[-i0]
v1000 = v10000.sum(axis=0)
v10010 = np.empty((5, out.shape[0]))
for i0 in range(1, 6):
v10010[-i0] = returns[-i0]
v1001 = v10010.sum(axis=0)
v100 = v1000 * v1001
v101000 = np.empty((5, out.shape[0]))
for i0 in range(11, 16):
v101000[10-i0] = open[-i0]
v10100 = v101000.sum(axis=0)
v101010 = np.empty((5, out.shape[0]))
for i0 in range(11, 16):
v101010[10-i0] = returns[-i0]
v10101 = v101010.sum(axis=0)
v1010 = v10100 * v10101
v101 = v1010 # delay
v10 = v100 - v101
v1 = stats.rankdata(v10)
out[:] = v0 * v1
class Alpha9(CustomFactor):
inputs = [USEquityPricing.close]
window_length = 7
def compute(self, today, assets, out, close):
v00 = np.full(out.shape[0], 0.0)
v010 = np.empty((5, out.shape[0]))
for i0 in range(1, 6):
v0100 = np.empty((2, out.shape[0]))
for i1 in range(1, 3):
v0100[-i1] = close[-i0-i1]
v010[-i0] = v0100[-1] - v0100[-2]
v01 = np.min(v010, axis=0)
v0 = v00 < v01
v10 = np.empty((2, out.shape[0]))
for i0 in range(1, 3):
v10[-i0] = close[-i0]
v1 = v10[-1] - v10[-2]
v2000 = np.empty((5, out.shape[0]))
for i0 in range(1, 6):
v20000 = np.empty((2, out.shape[0]))
for i1 in range(1, 3):
v20000[-i1] = close[-i0-i1]
v2000[-i0] = v20000[-1] - v20000[-2]
v200 = np.max(v2000, axis=0)
v201 = np.full(out.shape[0], 0.0)
v20 = v200 < v201
v210 = np.empty((2, out.shape[0]))
for i0 in range(1, 3):
v210[-i0] = close[-i0]
v21 = v210[-1] - v210[-2]
v220 = np.full(out.shape[0], -1.0)
v2210 = np.empty((2, out.shape[0]))
for i0 in range(1, 3):
v2210[-i0] = close[-i0]
v221 = v2210[-1] - v2210[-2]
v22 = v220 * v221
v2lgcl = np.empty(out.shape[0])
v2lgcl[v20] = v21[v20]
v2lgcl[~v20] = v22[~v20]
v2 = v2lgcl
vlgcl = np.empty(out.shape[0])
vlgcl[v0] = v1[v0]
vlgcl[~v0] = v2[~v0]
out[:] = vlgcl
all_factors = {
'Alpha5' : Alpha5,
'Alpha8' : Alpha8,
'Alpha9' : Alpha9,
}
return all_factors
Make the Pipeline
def make_pipeline(price_filter = 2000, min_price_filter = 1):
pipe_cols= None
pipe_cols = {}
# Set price filter
stock_price_filter = USEquityPricing.close.latest <= price_filter
stock_price_min = USEquityPricing.close.latest >= min_price_filter
# Before we do any other ranking, we want to throw away the bad assets.
initial_screen = (stock_price_min & stock_price_filter & Q500US())
## Test Factor
factors = make_factors()
pipe_cols = {name: f(mask=initial_screen) for name, f in factors.iteritems()}
# create the sum of ranks
pipe_cols["test_alpha"] = \
(pipe_cols['Alpha5'].rank(mask=initial_screen) +
pipe_cols['Alpha8'].rank(mask=initial_screen) +
pipe_cols['Alpha9'].rank(mask=initial_screen))
## Add Sector data
pipe_cols['Sector'] = Sector(mask=initial_screen)
## Return the new pipeline
return Pipeline(columns=pipe_cols, screen=initial_screen)
Set the Timeframe for this test
start_date='2014-01-01'
end_date='2015-12-01'
Get the Pipeline data
result = run_pipeline_chunks(make_pipeline(),
start_date=start_date,
end_date=end_date)
result.head()
Get the dates for the Aplens test that are 1 month in the past and 1 month in the future
start_date_alphalens='2013-12-01'
end_date_alphalens='2016-01-01'
# All assets that were returned in the pipeline result.
assets = result.index.levels[1].unique()
# We need to get a little more pricing data than the length of our factor so we
# can compare forward returns. We'll tack on another month in this example.
pricing = get_pricing(assets,
start_date=start_date_alphalens,
end_date=end_date_alphalens,
fields='open_price')
import alphalens
alphalens.tears.create_factor_tear_sheet(factor=result['test_alpha'],
prices=pricing,
groupby=result['Sector'],
show_groupby_plots=True,
periods=(1,5,10, 20, 30),
quantiles=3,
groupby_labels=MORNINGSTAR_SECTOR_CODES)