from quantopian.research import run_pipeline
from quantopian.pipeline import Pipeline
from quantopian.pipeline import factors, filters, classifiers
from quantopian.pipeline.filters import StaticAssets
from quantopian.pipeline.factors import CustomFactor, Returns, Latest, AverageDollarVolume, SimpleMovingAverage
from quantopian.pipeline.classifiers.morningstar import Sector
from quantopian.pipeline.filters.morningstar import IsPrimaryShare
from quantopian.pipeline.data import morningstar
from quantopian.pipeline.data.builtin import USEquityPricing
import math
import datetime
import numpy as np
import pandas as pd
MORNINGSTAR_SECTOR_CODES = {
-1: 'Misc',
101: 'Basic Materials',
102: 'Consumer Cyclical',
103: 'Financial Services',
104: 'Real Estate',
205: 'Consumer Defensive',
206: 'Healthcare',
207: 'Utilities',
308: 'Communication Services',
309: 'Energy',
310: 'Industrials',
311: 'Technology' ,
}
def high_volume_universe(top_liquid, min_price = None, min_volume = None):
"""
Computes a security universe of liquid stocks and filtering out
hard to trade ones
Returns
-------
high_volume_tradable - zipline.pipeline.filter
"""
full_filter = filters.make_us_equity_universe(
target_size=top_liquid,
rankby=factors.AverageDollarVolume(window_length=200),
mask=filters.default_us_equity_universe_mask(),
groupby=classifiers.morningstar.Sector(),
max_group_weight=0.3,
smoothing_func=lambda f: f.downsample('month_start'),
)
if min_price is not None:
price = SimpleMovingAverage(inputs=[USEquityPricing.close],
window_length=21, mask=full_filter)
full_filter &= (price >= min_price)
if min_volume is not None:
volume = SimpleMovingAverage(inputs=[USEquityPricing.volume],
window_length=21, mask=full_filter)
full_filter &= (volume >= min_volume)
return full_filter
def run_pipeline_chunks(pipe, start_date, end_date, chunks_len = None):
"""
Drop-in replacement for run_pipeline.
run_pipeline fails over a very long period of time (memery usage),
so we need to split in chunks the pipeline and concatenate the results
"""
chunks = []
current = pd.Timestamp(start_date)
end = pd.Timestamp(end_date)
step = pd.Timedelta(weeks=50) if chunks_len is None else chunks_len
while current <= end:
current_end = current + step
if current_end > end:
current_end = end
print 'Running pipeline:', current, ' - ', current_end
results = run_pipeline(pipe, current.strftime("%Y-%m-%d"), current_end.strftime("%Y-%m-%d"))
chunks.append(results)
# pipeline returns more days than requested (if no trading day), so get last date from the results
current_end = results.index.get_level_values(0)[-1].tz_localize(None)
current = current_end + pd.Timedelta(days=1)
return pd.concat(chunks)
def construct_factor_history(factor_cls, start_date='2015-10-1', end_date='2016-2-1',
factor_name='factor', top_liquid=500,
sector_column=None, filter_universe=True):
"""
Creates a DataFrame containing daily factor values and sector codes for a liquidity
constrained universe. The returned DataFrame is can be used in the factor tear sheet.
"""
if filter_universe: # this is very slow!
ok_universe = high_volume_universe(top_liquid)
else:
ok_universe = AverageDollarVolume(window_length=20).top(top_liquid)
factor = factor_cls(mask=ok_universe)
sector = Sector(mask=ok_universe)
pipe = Pipeline()
pipe.add(factor, factor_name)
if sector_column is not None: # this is very slow too
pipe.add(sector, sector_column)
pipe.set_screen(ok_universe)
daily_factor = run_pipeline_chunks(pipe, start_date=start_date, end_date=end_date)
#daily_factor = run_pipeline(pipe, start_date=start_date, end_date=end_date)
return daily_factor.dropna()
def get_daily_price(sid_universe, start_date, end_date, extra_days_before=0, extra_days_after=0):
"""
Creates a DataFrame containing daily percentage returns and price
"""
extra_days = math.ceil(extra_days_before * 365.0/252.0) + 3 # just to be sure
start_date = datetime.datetime.strptime(start_date, "%Y-%m-%d") - datetime.timedelta(days=extra_days)
start_date = start_date.strftime("%Y-%m-%d")
extra_days = math.ceil(extra_days_after * 365.0/252.0) + 3 # just to be sure
end_date = datetime.datetime.strptime(end_date, "%Y-%m-%d") + datetime.timedelta(days=extra_days)
end_date = end_date.strftime("%Y-%m-%d")
pricing = get_pricing(sid_universe, start_date=start_date, end_date=end_date, fields='open_price')
return pricing
import alphalens
import alphalens.performance as perf
import alphalens.utils as utils
def run_tear_sheet(factor,
factor_name,
start_date,
end_date,
top_liquid,
filter_universe,
show_sector_plots,
avgretplot,
periods,
quantiles,
bins,
filter_zscore,
long_short,
prices_cache = None):
sector_column = 'sector_code' if show_sector_plots else None
days_before, days_after = (0,0)
if avgretplot is not None:
days_before, days_after = avgretplot
days_after = max(days_after, max(periods) + 1)
#
## Run the Pipeline
#
print 'construct factor history'
factor = construct_factor_history(factor, start_date=start_date, end_date=end_date,
factor_name=factor_name, top_liquid=top_liquid,
sector_column=sector_column, filter_universe=filter_universe)
#
## Get prices
#
sid_universe = set( factor.index.levels[1].unique() )
if prices_cache is not None:
cached_sids = set(prices_cache.columns)
sid_universe -= cached_sids
print 'Get pricing for %d entries' % len(sid_universe)
if sid_universe:
prices = get_daily_price(sid_universe, start_date=start_date, end_date=end_date,
extra_days_before=days_before, extra_days_after=days_after)
if prices_cache is not None:
prices = pd.concat([prices, prices_cache], axis=1)
else:
prices = prices_cache
#
## Use Alphalens to create a factor tear sheet
#
print 'alphalens'
sectors_series = factor[sector_column] if show_sector_plots else None
factor_data = alphalens.utils.get_clean_factor_and_forward_returns(factor=factor[factor_name],
prices=prices,
groupby=sectors_series,
by_group=False,
quantiles=quantiles,
bins=bins,
periods=periods,
filter_zscore=filter_zscore,
groupby_labels=MORNINGSTAR_SECTOR_CODES)
if avgretplot:
alphalens.tears.create_event_returns_tear_sheet(factor_data=factor_data,
prices=prices,
avgretplot=avgretplot,
long_short=long_short,
by_group=show_sector_plots)
alphalens.tears.create_full_tear_sheet(factor_data=factor_data,
long_short=long_short,
group_adjust=False,
by_group=show_sector_plots)
return prices
from quantopian.pipeline.data.alpha_vertex import precog_top_100, precog_top_500
factor_name = 'factor'
start_date = '2010-01-01'
end_date = '2017-02-15'
top_liquid = 800
filter_universe = True # very slow, filter out untradable stocks
show_sector_plots = False # very slow to load the sector column in pipeline
# alphalens specific
periods = (1, 3, 5, 10)
avgretplot = (2, 7) # use None to avoid plotting or (days_before, days_after)
filter_zscore = None
long_short = True
prices_cache = None # this saves lots of time when running tear sheet multiple times
quantiles = None
bins = [-100,-0.03,-0.01,0.,0.01,0.03,100]
def factor(mask):
return Latest(inputs=[precog_top_100.predicted_five_day_log_return], mask=mask)
prices_cache = \
run_tear_sheet( factor = factor,
factor_name = factor_name,
start_date = start_date,
end_date = end_date,
top_liquid = top_liquid,
filter_universe = filter_universe,
show_sector_plots = show_sector_plots,
avgretplot = avgretplot,
periods = periods,
quantiles = quantiles,
bins = bins,
filter_zscore = filter_zscore,
long_short = long_short,
prices_cache = prices_cache)
def factor(mask):
return Latest(inputs=[precog_top_500.predicted_five_day_log_return], mask=mask)
prices_cache = \
run_tear_sheet( factor = factor,
factor_name = factor_name,
start_date = start_date,
end_date = end_date,
top_liquid = top_liquid,
filter_universe = filter_universe,
show_sector_plots = show_sector_plots,
avgretplot = avgretplot,
periods = periods,
quantiles = quantiles,
bins = bins,
filter_zscore = filter_zscore,
long_short = long_short,
prices_cache = prices_cache)
quantiles = 6
bins = None
def factor(mask):
return Latest(inputs=[precog_top_100.predicted_five_day_log_return], mask=mask)
prices_cache = \
run_tear_sheet( factor = factor,
factor_name = factor_name,
start_date = start_date,
end_date = end_date,
top_liquid = top_liquid,
filter_universe = filter_universe,
show_sector_plots = show_sector_plots,
avgretplot = avgretplot,
periods = periods,
quantiles = quantiles,
bins = bins,
filter_zscore = filter_zscore,
long_short = long_short,
prices_cache = prices_cache)
def factor(mask):
return Latest(inputs=[precog_top_500.predicted_five_day_log_return], mask=mask)
prices_cache = \
run_tear_sheet( factor = factor,
factor_name = factor_name,
start_date = start_date,
end_date = end_date,
top_liquid = top_liquid,
filter_universe = filter_universe,
show_sector_plots = show_sector_plots,
avgretplot = avgretplot,
periods = periods,
quantiles = quantiles,
bins = bins,
filter_zscore = filter_zscore,
long_short = long_short,
prices_cache = prices_cache)
long_short = False
def factor(mask):
return Latest(inputs=[precog_top_100.predicted_five_day_log_return], mask=mask)
prices_cache = \
run_tear_sheet( factor = factor,
factor_name = factor_name,
start_date = start_date,
end_date = end_date,
top_liquid = top_liquid,
filter_universe = filter_universe,
show_sector_plots = show_sector_plots,
avgretplot = avgretplot,
periods = periods,
quantiles = quantiles,
bins = bins,
filter_zscore = filter_zscore,
long_short = long_short,
prices_cache = prices_cache)
def factor(mask):
return Latest(inputs=[precog_top_500.predicted_five_day_log_return], mask=mask)
prices_cache = \
run_tear_sheet( factor = factor,
factor_name = factor_name,
start_date = start_date,
end_date = end_date,
top_liquid = top_liquid,
filter_universe = filter_universe,
show_sector_plots = show_sector_plots,
avgretplot = avgretplot,
periods = periods,
quantiles = quantiles,
bins = bins,
filter_zscore = filter_zscore,
long_short = long_short,
prices_cache = prices_cache)