from quantopian.pipeline import Pipeline, CustomFactor
from quantopian.pipeline.data import EquityPricing, factset
from quantopian.pipeline.factors import Returns, SimpleMovingAverage
from quantopian.pipeline.domain import (
AT_EQUITIES, # Austria
AU_EQUITIES, # Australia
BE_EQUITIES, # Belgium
BR_EQUITIES, # Brazil
CA_EQUITIES, # Canada
CH_EQUITIES, # Switzerland
CN_EQUITIES, # China
DE_EQUITIES, # Germany
DK_EQUITIES, # Denmark
ES_EQUITIES, # Spain
FI_EQUITIES, # Finland
FR_EQUITIES, # France
GB_EQUITIES, # Great Britain
HK_EQUITIES, # Hong Kong
IE_EQUITIES, # Ireland
IN_EQUITIES, # India
IT_EQUITIES, # Italy
JP_EQUITIES, # Japan
KR_EQUITIES, # South Korea
NL_EQUITIES, # Netherlands
NO_EQUITIES, # Norway
NZ_EQUITIES, # New Zealand
PT_EQUITIES, # Portugal
SE_EQUITIES, # Sweden
SG_EQUITIES, # Singapore
US_EQUITIES, # United States
)
from quantopian.research import run_pipeline
import pandas as pd
import numpy as np
import time
The below helper function makes it easier to get Alphalens-formatted factor and returns data given a pipeline factor, a domain, and date bounds.
def evaluate_factor(factor,
domain,
start_date,
end_date,
factor_screen=None,
quantiles=5,
returns_lengths=(1, 5, 10)):
"""Analyze a Pipeline Factor using Alphalens.
Parameters
----------
factor : quantopian.pipeline.factors.Factor
Factor producing scores to be evaluated.
domain : quantopian.pipeline.domain.Domain
Domain on which the factor should be evaluated.
start_date : str or pd.Timestamp
Start date for evaluation period.
end_date : str or pd.Timestamp
End date for evaluation period.
standardize :
factor_screen : quantopian.pipeline.filters.Filter, optional
Filter defining which assets ``factor`` should be evaluated on.
Default is ``factor.notnull()``.
quantiles : int, optional
Number of buckets to use for quantile groups. Default is 5
returns_lengths : sequence[int]
Forward-returns horizons to use when evaluating ``factor``.
Default is 1-day, 5-day, and 10-day returns.
Returns
-------
factor_data : pd.DataFrame
A (date, asset)-indexed DataFrame with the following columns:
'factor': float64
Values produced by ``factor``.
'factor_quantiles': int64
Daily quantile label for each
"""
calendar = domain.calendar
# Roll input dates to the next trading session.
start_date = calendar.minute_to_session_label(pd.Timestamp(start_date, tz='UTC'))
end_date = calendar.minute_to_session_label(pd.Timestamp(end_date, tz='UTC'))
if factor_screen is None:
factor_screen = factor.notnull()
# Run pipeline to get factor values and quantiles.
display('Getting factor values...')
factor_pipe = Pipeline(
{'factor': factor,
'factor_quantile': factor.quantiles(quantiles, mask=factor_screen)},
screen=factor_screen,
domain=domain,
)
factor_results = run_pipeline(factor_pipe, start_date, end_date, chunksize=250)
column_order = []
returns_cols = {}
for length in returns_lengths:
colname = '{}D'.format(length)
column_order.append(colname)
# Add 1 because "1-day" returns needs 2 price observations.
returns_cols[colname] = Returns(window_length=length + 1)
returns_pipe = Pipeline(returns_cols, domain=domain)
# Compute returns for the period after the factor pipeline, then
# shift the results back to align with our factor values.
display('Getting forward returns values...')
returns_start_date = start_date
returns_end_date = end_date + domain.calendar.day * max(returns_lengths)
raw_returns = run_pipeline(returns_pipe, returns_start_date, returns_end_date, chunksize=500)
shifted_returns = {}
for name, length in zip(column_order, returns_lengths):
# Shift 1-day returns back by a day, 5-day returns back by 5 days, etc.
raw = raw_returns[name]
shifted_returns[name] = backshift_returns_series(raw, length)
# Merge backshifted returns into a single frame indexed like our desired output.
display('Merging factor values with forward returns...')
merged_returns = pd.DataFrame(
data=shifted_returns,
index=factor_results.index,
columns=column_order,
)
# Concat factor results and forward returns column-wise.
merged = pd.concat([factor_results, merged_returns], axis=1)
merged.index.set_names(['date', 'asset'], inplace=True)
# Drop NaNs
merged = merged.dropna(how='any')
# Add a Business Day Offset to the DateTimeIndex
merged.index.levels[0].freq = pd.tseries.offsets.BDay()
display('Complete')
return merged
def backshift_returns_series(series, N):
"""Shift a multi-indexed series backwards by N observations in the first level.
This can be used to convert backward-looking returns into a forward-returns series.
"""
ix = series.index
dates, sids = ix.levels
date_labels, sid_labels = map(np.array, ix.labels)
# Output date labels will contain the all but the last N dates.
new_dates = dates[:-N]
# Output data will remove the first M rows, where M is the index of the
# last record with one of the first N dates.
cutoff = date_labels.searchsorted(N)
new_date_labels = date_labels[cutoff:] - N
new_sid_labels = sid_labels[cutoff:]
new_values = series.values[cutoff:]
assert new_date_labels[0] == 0
new_index = pd.MultiIndex(
levels=[new_dates, sids],
labels=[new_date_labels, new_sid_labels],
sortorder=1,
names=ix.names,
)
return pd.Series(data=new_values, index=new_index)
def backshift_returns_series(series, N):
"""Shift a multi-indexed series backwards by N observations in the first level.
This can be used to convert backward-looking returns into a forward-returns series.
"""
ix = series.index
dates, sids = ix.levels
date_labels, sid_labels = map(np.array, ix.labels)
# Output date labels will contain the all but the last N dates.
new_dates = dates[:-N]
# Output data will remove the first M rows, where M is the index of the
# last record with one of the first N dates.
cutoff = date_labels.searchsorted(N)
new_date_labels = date_labels[cutoff:] - N
new_sid_labels = sid_labels[cutoff:]
new_values = series.values[cutoff:]
assert new_date_labels[0] == 0
new_index = pd.MultiIndex(
levels=[new_dates, sids],
labels=[new_date_labels, new_sid_labels],
sortorder=1,
names=ix.names,
)
return pd.Series(data=new_values, index=new_index)
Create the factor one wants to test and associated filter. Specify domain and date range to run Alphalens
# Our alpha factor.
from quantopian.pipeline.data.factset import Fundamentals
class Momentum(CustomFactor):
# Default inputs
inputs = [EquityPricing.close]
# Compute momentum
def compute(self, today, assets, out, close):
out[:] = close[-1] / close[0]
momentum_1m = Momentum(window_length=22)
momentum_6m = Momentum(window_length=132)
momentum_2_6m = momentum_6m/momentum_1m
earningyield = Fundamentals.earn_yld_af.latest
high_ey = earningyield.percentile_between(50, 100)
price_book = Fundamentals.pbk_af.latest
low_pb = price_book.percentile_between(0, 50)
roic = Fundamentals.roic_af.latest
high_roic = roic.percentile_between(50, 100)
market_cap = Fundamentals.mkt_val_public.latest
small_cap = market_cap.percentile_between(0,50)
volume = EquityPricing.volume.latest
erp5= high_ey & low_pb & high_roic
# Below is the factor to pass to Alphalens to analyze.
# This must be a factor which returns a numerical number proportional to alpha for each security
# It cannot be a filter
my_factor = earningyield
# Below is the filter used by Alphalens.
my_filter = (volume > 50000) & small_cap
# Date range to run Alphalens
start_date = '2007-1-1'
end_date = '2019-1-1'
# Specify the domain to use for data
my_domain = JP_EQUITIES
# Call evaluate_factor on our factor to get Alphalens-formatted data.
al_data = evaluate_factor(
my_factor,
my_domain,
start_date,
end_date,
factor_screen=my_filter,
)
# Import Alphalens and run our factor data through a tear sheet.
from alphalens.tears import create_full_tear_sheet
create_full_tear_sheet(al_data)