from quantopian.pipeline import Pipeline, CustomFactor
from quantopian.pipeline.data import EquityPricing, factset
from quantopian.pipeline.factors import Returns, SimpleMovingAverage, AverageDollarVolume
from quantopian.pipeline.domain import (
AT_EQUITIES, # Austria
AU_EQUITIES, # Australia
BE_EQUITIES, # Belgium
CA_EQUITIES, # Canada
CH_EQUITIES, # Switzerland
CN_EQUITIES, # China
DE_EQUITIES, # Germany
DK_EQUITIES, # Denmark
ES_EQUITIES, # Spain
FI_EQUITIES, # Finland
FR_EQUITIES, # France
GB_EQUITIES, # Great Britain
HK_EQUITIES, # Hong Kong
IE_EQUITIES, # Ireland
IN_EQUITIES, # India
IT_EQUITIES, # Italy
JP_EQUITIES, # Japan
NL_EQUITIES, # Netherlands
NO_EQUITIES, # Norway
NZ_EQUITIES, # New Zealand
PT_EQUITIES, # Portugal
SE_EQUITIES, # Sweden
SG_EQUITIES, # Singapore
US_EQUITIES, # United States
)
from quantopian.research import run_pipeline
from quantopian.pipeline.filters import Q500US
# from quantopian.pipeline.classifiers.fundamentals import Sector
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
# import talib as ta
The below helper function makes it easier to get Alphalens-formatted factor and returns data given a pipeline factor, a domain, and date bounds.
def evaluate_factor(factor,
domain,
start_date,
end_date,
factor_screen=None,
quantiles=5,
returns_lengths=(1, 5, 10),
session = 'Overnight',
chunksize = None
):
"""Analyze a Pipeline Factor using Alphalens.
Parameters
----------
factor : quantopian.pipeline.factors.Factor
Factor producing scores to be evaluated.
domain : quantopian.pipeline.domain.Domain
Domain on which the factor should be evaluated.
start_date : str or pd.Timestamp
Start date for evaluation period.
end_date : str or pd.Timestamp
End date for evaluation period.
standardize :
factor_screen : quantopian.pipeline.filters.Filter, optional
Filter defining which assets ``factor`` should be evaluated on.
Default is ``factor.notnull()``.
quantiles : int, optional
Number of buckets to use for quantile groups. Default is 5
returns_lengths : sequence[int]
Forward-returns horizons to use when evaluating ``factor``.
Default is 1-day, 5-day, and 10-day returns.
session: str
"Overnight", "Intraday", "Daily"
Returns
-------
factor_data : pd.DataFrame
A (date, asset)-indexed DataFrame with the following columns:
'factor': float64
Values produced by ``factor``.
'factor_quantiles': int64
Daily quantile label for each
"""
calendar = domain.calendar
# Roll input dates to the next trading session.
start_date = calendar.minute_to_session_label(pd.Timestamp(start_date, tz='UTC'))
end_date = calendar.minute_to_session_label(pd.Timestamp(end_date, tz='UTC'))
if factor_screen is None:
factor_screen = factor.notnull()
# Run pipeline to get factor values and quantiles.
factor_pipe = Pipeline(
{'factor': factor,
'factor_quantile': factor.quantiles(quantiles, mask=factor_screen)},
screen=factor_screen,
domain=domain,
)
# Put chunksize ~252-504 if you run into memory problems
factor_results = run_pipeline(factor_pipe, start_date, end_date, chunksize=chunksize)
class Daily(CustomFactor):
inputs = [EquityPricing.close]
def compute(self, today, assets, out, close):
out[:] = close[-1] / close[0] - 1
class Overnight(CustomFactor):
inputs = [EquityPricing.close, EquityPricing.open]
def compute(self, today, assets, out, close, open):
out[:] = np.cumprod(open[1:] / close[:-1], axis=0)[-1] - 1
class Intraday(CustomFactor):
inputs = [EquityPricing.close, EquityPricing.open]
def compute(self, today, assets, out, close, open):
out[:] = np.cumprod(close / open, axis=0)[-1] - 1
column_order = []
returns_cols = {}
for length in returns_lengths:
colname = '{}D'.format(length)
column_order.append(colname)
# Add 1 because "1-day" returns needs 2 price observations.
# Not relevant for Intraday.
# Winsorize returns to handle data gliches
# Example: get_pricing("BRK_A", start_date='2014-11-03', end_date='2014-11-08')
# 0.002 * ~500 (companies per day) = 1 (from each side)
if session == 'Overnight':
returns_cols[colname] = Overnight(window_length=length + 1).winsorize(.002, .998)
elif session == "Intraday":
returns_cols[colname] = Intraday(window_length=length).winsorize(.002, .998)
elif session == "Daily":
returns_cols[colname] = Daily(window_length=length + 1).winsorize(.002, .998)
else:
raise SystemExit("session should be one of 'Overnight', 'Intraday', 'Daily'")
returns_pipe = Pipeline(returns_cols, domain=domain)
# Compute returns for the period after the factor pipeline, then
# shift the results back to align with our factor values.
returns_start_date = start_date
returns_end_date = end_date + domain.calendar.day * max(returns_lengths)
raw_returns = run_pipeline(returns_pipe, returns_start_date, returns_end_date, chunksize=252)
shifted_returns = {}
for name, length in zip(column_order, returns_lengths):
# Shift 1-day returns back by a day, 5-day returns back by 5 days, etc.
raw = raw_returns[name]
shifted_returns[name] = backshift_returns_series(raw, length)
# Merge backshifted returns into a single frame indexed like our desired output.
merged_returns = pd.DataFrame(
data=shifted_returns,
index=factor_results.index,
columns=column_order,
)
# Concat factor results and forward returns column-wise.
merged = pd.concat([factor_results, merged_returns], axis=1)
merged.index.set_names(['date', 'asset'], inplace=True)
return merged.dropna(how='any')
def backshift_returns_series(series, N):
"""Shift a multi-indexed series backwards by N observations in the first level.
This can be used to convert backward-looking returns into a forward-returns series.
"""
ix = series.index
dates, sids = ix.levels
date_labels, sid_labels = map(np.array, ix.labels)
# Output date labels will contain all but the last N dates.
new_dates = dates[:-N]
# Output data will remove the first M rows, where M is the index of the
# last record with one of the first N dates.
cutoff = date_labels.searchsorted(N)
new_date_labels = date_labels[cutoff:] - N
new_sid_labels = sid_labels[cutoff:]
new_values = series.values[cutoff:]
assert new_date_labels[0] == 0
new_index = pd.MultiIndex(
levels=[new_dates, sids],
labels=[new_date_labels, new_sid_labels],
sortorder=1,
names=ix.names,
)
return pd.Series(data=new_values, index=new_index)
# Low Volatility factor
class MyFactor (CustomFactor):
inputs = [Returns(window_length=2)]
window_length=252
def compute(self, today, assets, out, returns):
out[:] = -np.nanstd(returns, axis=0)
# Yield Factor
from quantopian.pipeline.data import morningstar
class Yield(CustomFactor):
inputs = [morningstar.valuation_ratios.total_yield]
window_length = 1
def compute(self, today, assets, out, syield):
out[:] = syield[-1]
Define your factors and filters here:
Yield1= Yield().zscore().winsorize(.005, .995)
volatility_fact=MyFactor().zscore().winsorize(.005, .995)
my_factor = volatility_fact + Yield1
# Create a volume filter that filters for stocks in the top 10% companies based on Average Dollar Volume.
avg_dollar_vol = AverageDollarVolume(window_length = 63)
volume_filter = avg_dollar_vol.percentile_between(90, 100, mask=(avg_dollar_vol > 0))
# Call evaluate_factor on your factor to get Alphalens-formatted data.
al_data = evaluate_factor(
factor = my_factor,
domain = US_EQUITIES,
start_date = '2010-01-01',
end_date = '2017-11-06',
factor_screen = volume_filter & Q500US(), # Remove Q500US() if using non-US market
session = "Intraday", # Can be "Overnight", "Intraday", "Daily"
quantiles = 5,
returns_lengths = (1, 5, 10),
chunksize = None # Put chunksize ~252-504 if you run into memory problems
)
Results index reference:
t(0) = index of results
# Import Alphalens and run our factor data through a tear sheet.
from alphalens.tears import create_full_tear_sheet
create_full_tear_sheet(al_data)
al_data
Plot Number of companies per day
pyfolio analysis:
from alphalens.performance import create_pyfolio_input
import alphalens
import pyfolio
pf_returns, pf_positions, pf_benchmark = \
create_pyfolio_input(al_data,
period='1D',
capital=1000000,
long_short=True,
group_neutral=False,
equal_weight=False, # Equal weight vs weight based on alpha factor
quantiles=[0,4], # Choose the "best" quantiles to trade based on your analysis above
groups=None,
benchmark_period='1D')
from pyfolio.tears import create_full_tear_sheet
create_full_tear_sheet(pf_returns,
positions=pf_positions,
benchmark_rets=pf_benchmark,
round_trips=True)