from quantopian.research import run_pipeline
from quantopian.pipeline import Pipeline
from quantopian.pipeline import factors, filters, classifiers
from quantopian.pipeline.classifiers import Classifier
from quantopian.pipeline.factors import CustomFactor, Returns, AverageDollarVolume, SimpleMovingAverage
from quantopian.pipeline.filters import StaticAssets, Q500US, Q1500US, Q3000US, QTradableStocksUS
from quantopian.pipeline.filters.fundamentals import IsPrimaryShare
from quantopian.pipeline.classifiers.fundamentals import Sector
from quantopian.pipeline.data.builtin import USEquityPricing
import alphalens
import math
import datetime
import numpy as np
import pandas as pd
## Helper functions
def high_volume_universe(top_liquid):
"""
Computes a security universe of liquid stocks and filtering out
hard to trade ones
Returns
-------
high_volume_tradable - zipline.pipeline.filter
"""
if top_liquid == 'QTradableStocksUS':
universe = QTradableStocksUS()
elif top_liquid == 500:
universe = Q500US()
elif top_liquid == 1500:
universe = Q1500US()
elif top_liquid == 3000:
universe = Q3000US()
else:
universe = filters.make_us_equity_universe(
target_size=top_liquid,
rankby=factors.AverageDollarVolume(window_length=200),
mask=filters.default_us_equity_universe_mask(),
groupby=Sector(),
max_group_weight=0.3,
smoothing_func=lambda f: f.downsample('month_start'),
)
return universe
def run_pipeline_chunks(pipe, start_date, end_date, chunks_len = None):
"""
Drop-in replacement for run_pipeline.
run_pipeline fails over a very long period of time (memery usage),
so we need to split in chunks the pipeline and concatenate the results
"""
chunks = []
current = pd.Timestamp(start_date)
end = pd.Timestamp(end_date)
step = pd.Timedelta(weeks=70) if chunks_len is None else chunks_len
while current <= end:
current_end = current + step
if current_end > end:
current_end = end
print 'Running pipeline:', current, ' - ', current_end
results = run_pipeline(pipe, current.strftime("%Y-%m-%d"), current_end.strftime("%Y-%m-%d"))
chunks.append(results)
# pipeline returns more days than requested (if no trading day), so get last date from the results
current_end = results.index.get_level_values(0)[-1].tz_localize(None)
current = current_end + pd.Timedelta(days=1)
return pd.concat(chunks)
def construct_factor_history(factor_cls, start_date='2015-10-1', end_date='2016-2-1',
factor_name='factor', top_liquid=500,
sector_column=None):
"""
Creates a DataFrame containing daily factor values and sector codes for a liquidity
constrained universe. The returned DataFrame is can be used in the factor tear sheet.
"""
ok_universe = high_volume_universe(top_liquid)
factor = factor_cls(mask=ok_universe)
sector = Sector(mask=ok_universe)
pipe = Pipeline()
pipe.add(factor, factor_name)
if sector_column is not None:
pipe.add(sector, sector_column)
pipe.set_screen(ok_universe)
daily_factor = run_pipeline_chunks(pipe, start_date=start_date, end_date=end_date)
#daily_factor = run_pipeline(pipe, start_date=start_date, end_date=end_date, chunksize=250)
return daily_factor.dropna()
def get_daily_price(sid_universe, start_date, end_date, extra_days_before=0, extra_days_after=0):
"""
Creates a DataFrame containing daily percentage returns and price
"""
extra_days = math.ceil(extra_days_before * 365.0/252.0) + 3 # just to be sure
start_date = datetime.datetime.strptime(start_date, "%Y-%m-%d") - datetime.timedelta(days=extra_days)
start_date = start_date.strftime("%Y-%m-%d")
extra_days = math.ceil(extra_days_after * 365.0/252.0) + 3 # just to be sure
end_date = datetime.datetime.strptime(end_date, "%Y-%m-%d") + datetime.timedelta(days=extra_days)
end_date = end_date.strftime("%Y-%m-%d")
pricing = get_pricing(sid_universe, start_date=start_date, end_date=end_date, fields='open_price')
return pricing
#
# 'run_tear_sheet' glues all the function together to make life easier to run the tear sheet on a pipeline factor
#
def run_tear_sheet(factor,
factor_name,
start_date,
end_date,
top_liquid,
show_sector_plots,
avgretplot,
periods,
quantiles,
bins,
filter_zscore,
long_short,
group_neutral,
prices_cache = None):
sector_column = 'sector_code' if (show_sector_plots or group_neutral) else None
days_before, days_after = (0,0)
if avgretplot is not None:
days_before, days_after = avgretplot
days_after = max(days_after, max(periods) + 1)
#
## Run the Pipeline
#
print 'construct factor history'
factor = construct_factor_history(factor, start_date=start_date, end_date=end_date,
factor_name=factor_name, top_liquid=top_liquid,
sector_column=sector_column)
#
## Get prices
#
sid_universe = set( factor.index.levels[1].unique() )
if prices_cache is not None:
cached_sids = set(prices_cache.columns)
sid_universe -= cached_sids
print 'Get pricing for %d entries' % len(sid_universe)
if sid_universe:
prices = get_daily_price(sid_universe, start_date=start_date, end_date=end_date,
extra_days_before=days_before, extra_days_after=days_after)
if prices_cache is not None:
prices = pd.concat([prices, prices_cache], axis=1)
else:
prices = prices_cache
#
## Use Alphalens to create a factor tear sheet
#
print 'Alphalens'
if len(np.isinf(factor[factor_name])) > 0:
print 'Dropping inf or -inf values from factor'
factor[factor_name] = factor[factor_name].replace([np.inf, -np.inf], np.nan)
sector_labels = dict(Sector.SECTOR_NAMES)
sector_labels[-1] = "Unknown" # no dataset is perfect, better handle the unexpected
sectors_series = factor[sector_column] if sector_column is not None else None
factor_data = alphalens.utils.get_clean_factor_and_forward_returns(factor=factor[factor_name],
prices=prices,
groupby=sectors_series,
binning_by_group=group_neutral,
quantiles=quantiles,
bins=bins,
periods=periods,
filter_zscore=filter_zscore,
groupby_labels=sector_labels,
max_loss=0.35)
if avgretplot:
alphalens.tears.create_event_returns_tear_sheet(factor_data=factor_data,
prices=prices,
avgretplot=avgretplot,
long_short=long_short,
group_neutral=group_neutral,
std_bar=False,
by_group=show_sector_plots)
alphalens.tears.create_returns_tear_sheet(factor_data=factor_data,
long_short=long_short,
group_neutral=group_neutral,
by_group=show_sector_plots)
return prices
import statsmodels.api as sm
from sklearn.decomposition import PCA
#from sklearn.decomposition import FastICA as PCA
def _get_residuals(data, factors):
model = sm.OLS(data, factors).fit()
return model.resid
def get_PCA_regressions(data, n_components=3):
pca = PCA(n_components=n_components)
pca.fit(data)
factors = pca.transform(data)
# calculate regression residuals
factors = sm.add_constant(factors)
resids = np.apply_along_axis(_get_residuals, 0, data, factors)
return resids
class PCAregress(CustomFactor):
#inputs = [Returns, LogReturns, USEquityPricing.close ...]
params = ('n_components', 'variable_components', )
window_safe = True
def compute(self, today, assets, out, returns, n_components, variable_components):
try:
if variable_components is not None:
n_components = len(assets) / variable_components
if n_components <= 0: n_components = 1
returns = np.nan_to_num(returns)
resids = get_PCA_regressions(returns, n_components)
zscores = (resids - resids.mean()) / resids.std()
zscores = zscores[-1]
out[:] = -zscores
except Exception:
out[:] = 0.
class JoinFactors(CustomFactor):
#inputs = [factor1, factor2, ...]
window_length = 1
def compute(self, today, assets, out, *inputs):
array = np.concatenate(inputs, axis=0)
out[:] = np.nansum(array, axis=0)
out[ np.all(np.isnan(array), axis=0) ] = np.nan
def make_PCA_factor(mask, window_length, n_components, variable_components=None):
PCAs = []
returns = Returns(window_length=2, mask=mask)
sector = Sector(mask=mask)
for sector_code in Sector.SECTOR_NAMES.keys():
sector_mask = sector.eq(sector_code)
pca = PCAregress(mask=sector_mask, inputs=[returns],
window_length=window_length,
n_components=n_components,
variable_components=variable_components)
PCAs.append(pca)
return JoinFactors(mask=mask, inputs=PCAs)
factor_name = 'factor'
start_date = '2005-01-01'
#start_date = '2015-12-01'
end_date = '2016-01-01'
top_liquid = 'QTradableStocksUS'
show_sector_plots = False
# alphalens specific
periods = (1, 3, 5, 7, 12, 20)
quantiles = 7
bins = None
avgretplot = (0, 20) # use None to avoid plotting or (days_before, days_after)
filter_zscore = None
long_short = True
group_neutral = True
prices_cache = None # this saves lots of time when running tear sheet multiple times
def factor(mask):
return make_PCA_factor(mask=mask, window_length=30, n_components=0.90)
prices_cache = \
run_tear_sheet( factor = factor,
factor_name = factor_name,
start_date = start_date,
end_date = end_date,
top_liquid = top_liquid,
show_sector_plots = show_sector_plots,
avgretplot = avgretplot,
periods = periods,
quantiles = quantiles,
bins = bins,
filter_zscore = filter_zscore,
long_short = long_short,
group_neutral = group_neutral,
prices_cache = prices_cache)
def factor(mask):
return make_PCA_factor(mask=mask, window_length=75, n_components=0.90)
prices_cache = \
run_tear_sheet( factor = factor,
factor_name = factor_name,
start_date = start_date,
end_date = end_date,
top_liquid = top_liquid,
show_sector_plots = show_sector_plots,
avgretplot = avgretplot,
periods = periods,
quantiles = quantiles,
bins = bins,
filter_zscore = filter_zscore,
long_short = long_short,
group_neutral = group_neutral,
prices_cache = prices_cache)
def factor(mask):
return make_PCA_factor(mask=mask, window_length=150, n_components=0.90)
prices_cache = \
run_tear_sheet( factor = factor,
factor_name = factor_name,
start_date = start_date,
end_date = end_date,
top_liquid = top_liquid,
show_sector_plots = show_sector_plots,
avgretplot = avgretplot,
periods = periods,
quantiles = quantiles,
bins = bins,
filter_zscore = filter_zscore,
long_short = long_short,
group_neutral = group_neutral,
prices_cache = prices_cache)
def factor(mask):
return make_PCA_factor(mask=mask, window_length=252, n_components=0.90)
prices_cache = \
run_tear_sheet( factor = factor,
factor_name = factor_name,
start_date = start_date,
end_date = end_date,
top_liquid = top_liquid,
show_sector_plots = show_sector_plots,
avgretplot = avgretplot,
periods = periods,
quantiles = quantiles,
bins = bins,
filter_zscore = filter_zscore,
long_short = long_short,
group_neutral = group_neutral,
prices_cache = prices_cache)
def factor(mask):
return make_PCA_factor(mask=mask, window_length=252, n_components=0.80)
prices_cache = \
run_tear_sheet( factor = factor,
factor_name = factor_name,
start_date = start_date,
end_date = end_date,
top_liquid = top_liquid,
show_sector_plots = show_sector_plots,
avgretplot = avgretplot,
periods = periods,
quantiles = quantiles,
bins = bins,
filter_zscore = filter_zscore,
long_short = long_short,
group_neutral = group_neutral,
prices_cache = prices_cache)
def factor(mask):
return make_PCA_factor(mask=mask, window_length=252, n_components=0.60)
prices_cache = \
run_tear_sheet( factor = factor,
factor_name = factor_name,
start_date = start_date,
end_date = end_date,
top_liquid = top_liquid,
show_sector_plots = show_sector_plots,
avgretplot = avgretplot,
periods = periods,
quantiles = quantiles,
bins = bins,
filter_zscore = filter_zscore,
long_short = long_short,
group_neutral = group_neutral,
prices_cache = prices_cache)
def factor(mask):
return make_PCA_factor(mask=mask, window_length=252, n_components=3)
prices_cache = \
run_tear_sheet( factor = factor,
factor_name = factor_name,
start_date = start_date,
end_date = end_date,
top_liquid = top_liquid,
show_sector_plots = show_sector_plots,
avgretplot = avgretplot,
periods = periods,
quantiles = quantiles,
bins = bins,
filter_zscore = filter_zscore,
long_short = long_short,
group_neutral = group_neutral,
prices_cache = prices_cache)