In this notebook, we generate two example alpha factors with the information provided by the cointegrated pairs. A pair of stocks is called cointegrated if their prices move together over time and the distance between the prices is stable. There are Quantopian lectures on the mathematical meaning of cointegration and pairs trading strategy. If you are interested please check them out but they are not a prerequisite for this notebook. The above lectures introduce the general idea of exploiting price divergences between pairs. This method has been around for a long time and we, in fact, have our own implementation of it.
In this notebook, we want to introduce a different idea to use pairs. The basic idea is that if some event happens to one leg of the pair, like an earnings announcement that beats estimates, it is more likely that the market would price a higher chance of the same thing will happen to the other leg too, so in this case, the other leg would also beat estimates. This notebook develops this idea with two simple examples. These serve to give you an idea, but we ask you to use your creativity to come up with novel ideas and submit them to us.
In addition to a different take on pairs trading, this notebook also shows how a factor can be constructed from the pair information. Usually there is logic to directly going long/short stocks if prices diverge. Here instead we want to provide individual scores.
import pandas as pd
import numpy as np
from quantopian.pipeline import Pipeline, CustomFactor
from quantopian.research import run_pipeline
from quantopian.pipeline.factors import BusinessDaysSincePreviousEvent
# It would be helpful to split the dataset as a training dataset and test dataset when designing algorithm.
# The dataset here we use in this notebook is the training dataset.
training_start_date = '2012-01-04'
training_end_date = '2018-06-11'
from zipline.utils.calendars import get_calendar
calendar = get_calendar('NYSE')
trading_calendar = calendar.all_sessions
trading_calendar_index = trading_calendar[(trading_calendar >= training_start_date) \
& (trading_calendar <= training_end_date)]
The data that we provide for you here is a major part of pairs we identified beforehand. There will be more information on how that was done but those details are not important in this exercise. We will just accept the pairs as given. The pair information is encoded in a .csv file that can be uploaded as self-serve data to build the Pipeline. First however, let's have a brief look at the data.
The column trade_date_minus_one
is the date when we discovered the pair, and the column trade_date
is the point at which we can trade the pair. In order to encode the pair information in a way that self-serve accepts, we had to get a little creative. Basically, if two stocks contribute a pair, they will have the same number in one of the groups. You can see that ABM
and ADSK
are a pair because they both have a 1
in the column group_1
. A stock can be in more than one pair. For this reason, we encoded the information in multiple columns group_i
. For example, you can see in column group_2
that ABM
is not just a pair with ADSK
, but also with APH
. The number 0
is a place-holder meaning no pair. The group number does not mean to reflect the relative strength of cointegration. The pairs in different groups are equally important.
Now, we are ready upload the pairs_self_serve_dataset.csv
as a self-serve dataset on Q web. To quickly learn how to upload it, please refer Upload Your Custom Datasets and Signals with Self-Serve Data.
Note: When uploading pairs_self_serve_dataset.csv
as a self-serve dataset, please set trade_date_minus_one
as Primary Date
and ticker
as Primary Asset
. The column types of trade_date_minus_one
, trade_date
, ticker
, and group_i
are date
, datetime
, string
, and number
.
Let's run the corresponding Pipeline that gets the dataset we uploaded. First, we need to run the import statement:
# from quantopian.pipeline.data.user_[user_ID] import [dataset name]
from quantopian.pipeline.data.user_57e2b12557e9c947ce001019 import pairs_self_serve_dataset
# Columns in pairs_self_serve_dataset:
pairs_self_serve_dataset.columns
There are 17 groups of pairs. First, let's take a look at the data in the first 3 groups.
# Check the number of days since the the data was updated. Here, we consider the pairs_self_serve_dataset
# to be fresh if it's less than 1 days old.
is_pairs_data_fresh = (BusinessDaysSincePreviousEvent(inputs=[pairs_self_serve_dataset.asof_date]) <= 1)
pipe = Pipeline(
columns={
'group_1': pairs_self_serve_dataset.group_1.latest,
'group_2': pairs_self_serve_dataset.group_2.latest,
'group_3': pairs_self_serve_dataset.group_3.latest,
'trade_date': pairs_self_serve_dataset.trade_date.latest,
'asof_date': pairs_self_serve_dataset.asof_date.latest
},
screen=pairs_self_serve_dataset.trade_date.latest.notnull()&is_pairs_data_fresh
)
df = run_pipeline(pipe, training_start_date, training_end_date)
df.head()
As we can see, the date we run the pipeline for is the trade_date
and the asof_date
is the trade_date_minus_one
.
We could transform this into the daily lists of pairs. For example, when trade_date
is 2012-01-05
, we could obtain a list of pairs from group_1
as follows:
date = '2012-01-05'
df_temp = df.xs(date, level=0)
group_1 = df_temp.loc[:, 'group_1']
pairs = group_1[group_1!=0] # exclude the assets with values equal to 0
pairs.head()
list_pairs = []
for _, pair in pairs.groupby(pairs): # select the assets share same value
list_pairs = list_pairs + [(pair.index[0].symbol, pair.index[1].symbol)]
# First 5 elements of the cointegrated pairs:
list_pairs[:5]
earnings_surprise
Computed with Factset Data¶To get the data of each asset's earning surprise, we use the code borrowed from New Data: Factset Estimates.
import quantopian.pipeline.data.factset.estimates as fe
# Slice the PeriodicConensus and Actuals DataSetFamilies into DataSets. In this context,
# fq0_eps_cons is a DataSet containing consensus estimates data about EPS for the
# most recently reported fiscal quarter. fq0_eps_act is a DataSet containing the actual
# reported EPS for the most recently reported quarter.
fq0_eps_cons = fe.PeriodicConsensus.slice('EPS', 'qf', 0)
fq0_eps_act = fe.Actuals.slice('EPS', 'qf', 0)
# Get the latest mean consensus EPS estimate for the last reported quarter.
fq0_eps_cons_mean = fq0_eps_cons.mean.latest
# Get the EPS value from the last reported quarter.
fq0_eps_act_value = fq0_eps_act.actual_value.latest
# Define a surprise factor to be the relative difference between the estimated and
# reported EPS.
fq0_surprise = (fq0_eps_act_value - fq0_eps_cons_mean) / fq0_eps_cons_mean
The idea of the alpha factor is that if one stock of a cointegrated pair has a positive earnings surprise, then the other stock from the same pair may have a positive move in price. Let us define the corresponding custom factor.
# Here, we consider the EarningsSurprises data to be fresh if it's less than 30 days old,
# but it could be changed to a longer time period or shorter time period.
is_es_data_fresh = (BusinessDaysSincePreviousEvent(inputs=[fq0_eps_act.asof_date]) <= 30)
class Pairs_ES(CustomFactor):
window_length = 1
# inputs: One group of pairs and the data of earnings_surprises
# mask = is_es_data_fresh
def compute(self, today, asset_ids, out, one_group_pairs, earnings_surprises):
# store the input data, one_group_pairs, into a pandas Series with sid numbers
# as index
pairs = pd.Series(one_group_pairs[0, :], index=asset_ids)
# exclude the assets with values equal to 0
pairs = pairs[pairs != 0]
# store the input data, earnings_surprises, into a pandas Series with sid numbers
# as index
earnings_surprises = pd.Series(earnings_surprises[0, :], index=asset_ids)
output = pd.Series(index=asset_ids)
# find the assets that share the same value, and save their sids as leg_1 and leg_2
for _, pair in pairs.groupby(pairs):
# check if some assets in the pairs are excluded from pipeline
if len(pair.index) == 2:
(leg_1, leg_2) = pair.index
# assign the earnings_surprises score of pair leg 2 to pair leg 1
output[leg_1] = earnings_surprises[leg_2]
# assign the earnings_surprises score of pair leg 1 to pair leg 2
output[leg_2] = earnings_surprises[leg_1]
out[:] = output.values
Then, we could define a pipeline to run this custom factor for the first group of pairs, and include the values of group 1 and the data of earning surprises as reference.
def make_pipeline_es_sample():
pipe = Pipeline()
pipe.add(fq0_surprise, 'es_data') # earning surprises
pipe.add(pairs_self_serve_dataset.group_1.latest, 'group_1') # the group 1 of pairs_self_serve_dataset
earnings_surprises_group_1 = Pairs_ES(
inputs=[pairs_self_serve_dataset.group_1.latest,
fq0_surprise],
mask=is_pairs_data_fresh
)
pipe.add(earnings_surprises_group_1, 'group_1' + '_es')
pipe.set_screen(pairs_self_serve_dataset.trade_date.latest.notnull()\
& is_es_data_fresh & earnings_surprises_group_1.notnull())
return pipe
pipe_es_sample_output = run_pipeline(make_pipeline_es_sample(), training_start_date, training_end_date)
pipe_es_sample_output.head()
Since there are multiple groups in pairs_self_serve_dataset
and some stocks may be in multiple groups (i.e. some stocks are shared by different cointegrated pairs. e.g. (A
, B
) and (A
, C
)), we would like to combine the results computed by the custom function, Pairs_ES
for multiple groups. We define another custom function named MeanFactor
to combine them by taking their average values as the alpha factor.
# Compute the mean factor.
class MeanFactor(CustomFactor):
window_length=1
def compute(self, today, asset_ids, out, *inputs):
output = pd.DataFrame(index=asset_ids)
for i in range(0, len(inputs)):
output['col_' + str(i)] = inputs[i][0,:]
out[:] = output.mean(axis=1).values
def make_pipeline_es():
pipe = Pipeline()
earnings_surprises = np.asarray([]) # store the results computed by Pairs_ES for each group
for col in pairs_self_serve_dataset.columns:
if 'group' in col.name: # for loop each group
earnings_surprises_per_group = Pairs_ES(
inputs=[col.latest, fq0_surprise],
mask=is_pairs_data_fresh
)
earnings_surprises = np.append(earnings_surprises,
earnings_surprises_per_group)
alpha_factor = MeanFactor(inputs=earnings_surprises).winsorize(0.2, 0.98) # compute the mean factor
pipe.add(alpha_factor, 'es')
screen_zeros = (alpha_factor != 0.0)
screen = (screen_zeros & pairs_self_serve_dataset.trade_date.latest.notnull()\
& is_es_data_fresh & alpha_factor.notnull())
pipe.set_screen(screen)
return pipe
pipe_es_output = run_pipeline(make_pipeline_es(), training_start_date, training_end_date)
Before running this alpha factor in the algorithm environment, we can use alphalens
to analyze it and maybe tweak it.
trading_day_delta = calendar.day
px = get_pricing(
pipe_es_output.index.get_level_values(1).unique(), pd.Timestamp(training_start_date),
pd.Timestamp(training_end_date) + 252*trading_day_delta,
fields=['open_price']).iloc[0, :, :]
from alphalens.utils import get_clean_factor_and_forward_returns
factor_data = get_clean_factor_and_forward_returns(
factor=pipe_es_output['es'],
prices=px,
bins=(-1000,0,1000),
quantiles=None,
periods=range(1,35,5)
)
# Show the first 5 rows of merged_data.
factor_data.head(5)
from alphalens.performance import mean_information_coefficient
mean_information_coefficient(factor_data).plot(title="IC Decay");
from alphalens.tears import create_full_tear_sheet
create_full_tear_sheet(factor_data=factor_data)
# Since the earnings data is quarterly based, the factor_data may not have values for every trading day.
# That is why there are some NaN values and discrete lines in the tearsheet.
stocktwits
¶This alpha factor is designed to be the average sentiment scores of the stocks in the same cointegrated pair. The idea is that one stock's alpha score here is not only influenced by its own sentiment score but also influenced by the other legs' sentiment score.
from quantopian.pipeline.data.psychsignal import stocktwits
Let us define a custom factor to compute the average sentiment scores of pair legs. Then, we can borrow the codes from the previous example to build the pipeline and run the alphalens tearsheet.
# The alpha scores of the assets in the same pair are designed to be their average sentiment scores.
class Pairs_Sentiment(CustomFactor):
window_length = 1
def compute(self, today, asset_ids, out, pair_groups, sentiment_score):
pairs = pd.Series(pair_groups[0, :], index=asset_ids)
pairs = pairs[pairs != 0]
sentiment_score = pd.Series(sentiment_score[0, :], index=asset_ids)
output = pd.Series(index=asset_ids)
for _, pair in pairs.groupby(pairs):
if len(pair.index) == 2:
(leg_1, leg_2) = pair.index
# take the average of sentiment scores
output[leg_1] = (sentiment_score[leg_1] + sentiment_score[leg_2]) / 2
output[leg_2] = (sentiment_score[leg_1] + sentiment_score[leg_2]) / 2
out[:] = output.values
def make_pipeline_sentiment():
pipe = Pipeline()
sentiment_scores = np.asarray([])
for col in pairs_self_serve_dataset.columns:
if 'group' in col.name:
sentiment_scores_per_group = Pairs_Sentiment(
inputs=[col.latest, stocktwits.bull_minus_bear.latest],
mask=is_pairs_data_fresh
)
sentiment_scores = np.append(sentiment_scores, sentiment_scores_per_group)
alpha_factor = MeanFactor(inputs=sentiment_scores)
pipe.add(alpha_factor, 'sentiment_score')
screen_zeros = (alpha_factor != 0.0)
screen = (pairs_self_serve_dataset.trade_date.latest.notnull()\
&alpha_factor.notnull()\
&screen_zeros)
pipe.set_screen(screen)
return pipe
pipe_sentiment_output = run_pipeline(make_pipeline_sentiment(), training_start_date, training_end_date)
trading_day_delta = calendar.day
px = get_pricing(
pipe_sentiment_output.index.get_level_values(1).unique(), pd.Timestamp(training_start_date),
pd.Timestamp(training_end_date) + 252*trading_day_delta,
fields=['open_price']).iloc[0,:,:]
from alphalens.utils import get_clean_factor_and_forward_returns
factor_data = get_clean_factor_and_forward_returns(
factor=pipe_sentiment_output['sentiment_score'],
prices=px,
bins=(-100,0,100),
quantiles=None,
periods=range(20,180,30)
)
# Show the first 5 rows of merged_data.
factor_data.head(5)
from alphalens.tears import create_full_tear_sheet
create_full_tear_sheet(factor_data=factor_data)
Thanks to Saba Nejad, Jamie McCorriston and Thomas Wiecki for their contribution to this notebook.