from quantopian.pipeline.factors import Returns
from quantopian.pipeline import Pipeline, CustomFactor
from quantopian.research import run_pipeline, symbols
from quantopian.pipeline.data.builtin import EquityPricing
from alphalens.tears import create_full_tear_sheet, create_returns_tear_sheet
from alphalens.performance import mean_information_coefficient
from quantopian.pipeline.domain import (
AT_EQUITIES, # Austria
AU_EQUITIES, # Australia
BE_EQUITIES, # Belgium
BR_EQUITIES, # Brazil
CA_EQUITIES, # Canada
CH_EQUITIES, # Switzerland
CN_EQUITIES, # China
DE_EQUITIES, # Germany
DK_EQUITIES, # Denmark
ES_EQUITIES, # Spain
FI_EQUITIES, # Finland
FR_EQUITIES, # France
GB_EQUITIES, # Great Britain
HK_EQUITIES, # Hong Kong
IE_EQUITIES, # Ireland
IN_EQUITIES, # India
IT_EQUITIES, # Italy
JP_EQUITIES, # Japan
KR_EQUITIES, # South Korea
NL_EQUITIES, # Netherlands
NO_EQUITIES, # Norway
NZ_EQUITIES, # New Zealand
PT_EQUITIES, # Portugal
SE_EQUITIES, # Sweden
SG_EQUITIES, # Singapore
US_EQUITIES, # United States
)
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import math
import time
###########################params##################################
start_date = '2005-01-01'
end_date = '2019-01-01'
domain = DE_EQUITIES
#-------------------------Volume Restriction for stocks------------
top_vol_perc = 30
vol_window = 20 #4 weeks
#in this case we look for stocks whose minimum daily volume of the recent 20 businessdays was in the top 30% of all stocks
#-------------------------MA Crossover params----------------------
window_length_MA1 = 50
window_length_MA2 = 200 #set this one as the bigger one
#for the calculation of a weighted average. the normalization happens in the code later. constant 1 would be SMA
weightfunct_MA1 = lambda t : 1 / math.sqrt(-t + window_length_MA1 +1)
weightfunct_MA2 = lambda t : 1
#-------------------------forward returns we want to know----------
forwardDays = [1,2,3,4,5]
weightfunction of MA1 looks the following. The idea ist that it will become more sensitive to the most recent data since it should be more representative of the current mean than the one long ago.
pd.DataFrame(np.fromfunction(np.vectorize(lambda t : weightfunct_MA1(t)), [window_length_MA1])).plot()
#lets directly convert the dates to match the domain
start_date = domain.calendar.minute_to_session_label(pd.Timestamp(start_date,tz='UTC'))
end_date = domain.calendar.minute_to_session_label(pd.Timestamp(end_date,tz='UTC'))
Now we want to screen for the best volume stocks to avoid most liquidity problems:
#Factor that gets us the least volume in the last vol_window days
class MinVolume(CustomFactor):
window_length = vol_window
inputs = [EquityPricing.volume]
def compute(self, today ,assets, out, vol):
out[:] = np.min(vol, axis=0)
#our volume filter now becomes. To make sure we never get any stocks that dont trade on a given day we add the mask
min_vol = MinVolume()
top_vol = min_vol.percentile_between(100 - top_vol_perc, 100, mask = (min_vol >0))
Here is our custom weighted mean factor. Since we cant modify the init() here is a small workaround for it:
"""
weightfunc: a weightfunction of the form lambda i : <comutation> to weight the elements inside the window
volume_weighted: boolean weither to also weight by volume
"""
def makeWeightedMean(weightFunct, volume_weighted, window_len):
if volume_weighted:
class WeightedMean(CustomFactor):
inputs = [EquityPricing.close, EquityPricing.volume]
window_length = window_len
mask = top_vol
#tranfsorm the function into a 2d array and normalise the weights
weights = np.fromfunction(np.vectorize(lambda i,j : weightFunct(i)), (window_length,1))
def compute(self, today, asset_ids, out, close, volume):
nanMask = ~np.isnan(close) & ~np.isnan(volume)
weights = np.multiply(self.weights,volume) #weights based on our function
weights = np.multiply(weights, nanMask) #get rid of the weights for Nans to get proper normalization
weights = weights/weights.sum(axis = 0) #normalize our weights
weighted_close = np.multiply(weights, close)
out[:] = np.nansum(weighted_close,axis=0) #get the weighted mean for each stock
return WeightedMean()
else:
class WeightedMean(CustomFactor):
inputs = [EquityPricing.close]
window_length = window_len
mask = top_vol
#tranfsorm the function into a 2d array and normalise the weights
weights = np.fromfunction(np.vectorize(lambda i,j : weightFunct(j)), (1, window_length))
def compute(self, today, asset_ids, out, close):
nanMask = ~np.isnan(close)
weights = np.multiply(self.weights.transpose(), nanMask)
weights = weights/weights.sum(axis = 0) #normalize the weights
out[:] = np.nansum(np.multiply(weights, close),axis= 0) #get the weighted mean for each stock
return WeightedMean()
def makePipeline():
ma1 = makeWeightedMean(weightfunct_MA1,True,window_length_MA1)
ma2 = makeWeightedMean(weightfunct_MA2,False, window_length_MA2)
return Pipeline(screen = top_vol, columns = {'diff' : ma1-ma2}, domain = domain)
result = run_pipeline(makePipeline(), start_date=start_date - domain.calendar.day, end_date=end_date)
Since the moving averages arent window safe we calculate the crossover outside of the pipeline:
result_unst = result.unstack() #gets rid of the level 2 index
cross_down = (result_unst.values[1:] >= 0) & (result_unst.values[:-1] < 0) #previous day MA1 was bigger, today its smaller
cross_up = (result_unst.values[1:] < 0) & (result_unst.values[:-1] >= 0) #the other way around
cross_down = pd.DataFrame(cross_down, index = result_unst.index[1:], columns=result_unst.columns) #add back the index and make a DF of it
cross_up = pd.DataFrame(cross_up, index = result_unst.index[1:], columns=result_unst.columns)
#and now we stack the index back again
cross_down = cross_down.stack(dropna = False)
cross_up = cross_up.stack(dropna = False)
cross_up.columns = ["factor"]
cross_down.columns = ["factor"]
cross_combined = cross_up.astype(float) - (cross_down.astype(float))
I dont know weither there is some built in function to merge the data with the forward returns for boolean Factors. Thats why we do the merging by hand. First we generate the forward returns:
#set up our Factorlist and column names
columns = []
returns_factors = {}
for days in forwardDays:
columns.append(str(days) + "D")
returns_factors[str(days) + 'D'] = Returns(window_length = days + 1, mask= top_vol)
returns_pipeline = Pipeline(returns_factors, domain = domain, screen=top_vol)
returns_start = start_date
returns_end = end_date + domain.calendar.day * max(forwardDays) # get the end and add some businessdays
returns = run_pipeline(returns_pipeline, returns_start,returns_end)
returns = returns.reindex(columns= columns) #dict and Pipeline messes up the order even if we would use an ordered dict, so here is a small fix
Now we want to shift the returns, so that on the date x we get the futur returns. Again to get rid of the inconvenience coming with multiindex we use unstack().
dateIndex = cross_up.index.levels[0]
returns_shifted = pd.DataFrame(index = dateIndex,
columns = pd.MultiIndex.from_product([columns, returns.index.levels[1]]))
returns_unst = returns.unstack()
for days in forwardDays:
cutoff = days - max(forwardDays)
if cutoff == 0:#this special case would break the notation [days, 0]
cutoff = None
current = returns_unst[str(days) + 'D'].iloc[days : cutoff] #cut off the data
current.index = dateIndex
returns_shifted[str(days) + "D"] = current
returns_shifted = returns_shifted.stack(dropna = False)
returns_shifted = returns_shifted.reindex(columns= columns) #fix the messed up order of the columns
cross_combined_quantiles = (cross_combined + 1).astype(int)
cross_combined_quantiles.columns = ["factor_quantile"]
merged = pd.concat([cross_combined, cross_combined_quantiles, returns_shifted], axis = 1).dropna()
merged.index.set_names(['date', 'asset'], inplace=True)
merged.index.levels[0].freq = pd.tseries.offsets.BDay()
create_full_tear_sheet(merged)
To get rid of the unequally sized quantiles one could drop the middle one. Strangely this also alters the results of the remaining quantiles in alphalens:
cross_combined_quantiles = cross_combined[cross_combined['factor'] != 0]
cross_combined_quantiles += 1
cross_combined_quantiles /=2
cross_combined_quantiles = cross_combined_quantiles.astype(int)
cross_combined = cross_combined.reindex(cross_combined_quantiles.index)
returns_shifted = returns_shifted.reindex(cross_combined_quantiles.index)
cross_combined_quantiles.columns = ["factor_quantile"]
merged = pd.concat([cross_combined, cross_combined_quantiles, returns_shifted], axis = 1).dropna()
merged.index.set_names(['date', 'asset'], inplace=True)
merged.index.levels[0].freq = pd.tseries.offsets.BDay()
create_full_tear_sheet(merged)