pip install arch
pip install -U statsmodels
pip install datapackage
pip install pandas_datareader
pip install pandas
pip install yfinance --upgrade --no-cache-dir
import re
import os
import csv
import sqlite3
import yfinance as yf
import pandas as pd
import numpy as np
import statsmodels
import time
import datetime
import matplotlib.pyplot as plt
import statsmodels.api as sm
from datapackage import Package
from pandas_datareader import data as pdr
from statsmodels.tsa.stattools import coint
from statsmodels.tsa.stattools import adfuller
from sklearn.linear_model import LinearRegression
from arch.unitroot import PhillipsPerron
yf.pdr_override()
# it always returns True so I don't no if there is a reason to use this function
def PhillipsPerronTest(series):
series += 500 # if any of values is equal or less than zero then it throws fatal Exception
inflation = np.diff(np.log(series))
pp = PhillipsPerron(inflation)
if pp.stat < pp.critical_values['5%'] and pp.pvalue < 0.001 :
return True
else:
return False
def plotLinearRegression(S1, S2):
tickerA = S1.name
tickerB = S2.name
S1 = S1.dropna()
S2 = S2.dropna()
S1 = sm.add_constant(S1)
results = sm.OLS(S2, S1).fit()
S1 = S1[tickerA]
b = results.params[tickerA]
spread = S2 - b * S1
spread.plot()
plt.axhline(spread.mean(), color='black')
plt.legend(['Spread']);
def plotPriceRatio(S1, S2):
S1 = S1.dropna()
S2 = S2.dropna()
ratio = S1 / S2
ratio.plot()
plt.axhline(ratio.mean(), color='black')
plt.legend(['Price Ratio']);
def zscore(series):
return (series - series.mean()) / np.std(series)
def plotZscore(series, up=2.0, down=-2.0):
zscore(series).plot()
plt.axhline(zscore(series).mean(), color='black')
plt.axhline(up, color='red', linestyle='--')
plt.axhline(down, color='green', linestyle='--')
plt.legend(['Spread z-score', 'Mean', up, down]);
# filter stock pairs to meet conditions of correlation and cointegration
# 'dataSet' is set of initial stock pairs it must be a Pandas DataFrame
# 'corrThreshold' is the percent of correlation for filtering, to pass threshold
# 'limitResult' is the number of filtered results we want to obtain. When we have
# this number of filetred pairs the function ceases its runing and returns result.
# If equals -1 then search for all available results.
# 'debug' if True then output all debug related data
def filterPairs(dataSet, corrThreshold=0.70, limitResult=-1, debug=False):
dataSetCopy = dataSet.copy()
pairs = []
obtained = 0
for ticker in dataSet:
masterStock = dataSetCopy.pop(ticker) # we're going to compare all remained stocks with this one
for _ticker in dataSetCopy:
corr = masterStock.corr(dataSetCopy[_ticker])
if corr >= corrThreshold:
spread = masterStock - dataSetCopy[_ticker]
spread = spread.dropna() # remove all nan-values we have to because otherwise adfuller throws Exception
_spread = dataSetCopy[_ticker] - masterStock
_spread = _spread.dropna() # remove all nan-values we have to because otherwise adfuller throws Exception
adfTest = adfuller(spread)
_adfTest = adfuller(_spread)
adfullerScore = min([adfTest[0], _adfTest[0]])
pvalue = adfTest[1]
crit = adfTest[4]
if adfullerScore < crit['5%'] and pvalue < 0.001:
# get linear regressions
A = masterStock
B = dataSetCopy[_ticker]
A = A.dropna()
B = B.dropna()
A = A.values
B = B.values
if debug:
if len(A) != len(B):
print('!!!NOTION!!!')
print('len(A) : %s' % len(A), 'len(B) : %s' % len(B))
if len(A) == len(B):
# A depends on B
model = LinearRegression().fit(A.reshape(-1, 1), B)
lrAB = {'intercept': model.intercept_, 'slope': model.coef_[0]}
# B depends on A
model = LinearRegression().fit(B.reshape(-1, 1), A)
lrBA = {'intercept': model.intercept_, 'slope': model.coef_[0]}
# get the most negative adfuller score
if adfTest[0] <= _adfTest[0]:
fullerCombination = 'AB' # A - B
else:
fullerCombination = 'BA' # B - A
pair = ticker + '/' + _ticker
pairs.append({'pair': pair, 'ticker_a': ticker, 'ticker_b': _ticker, 'series_a': masterStock,
'series_b': dataSetCopy[_ticker], 'spread': spread,
'adf_combi': fullerCombination, 'adf_score': adfullerScore,
'ab_intercept': lrAB['intercept'], 'ab_slope': lrAB['slope'],
'ba_intercept': lrBA['intercept'], 'ba_slope': lrBA['slope']})
if debug:
print('%s &' % ticker, '%s' % _ticker, 'with corr: %s' % corr,
'p-value: %s' % pvalue,'adfScore: %s' % adfullerScore)
#pd.concat([masterStock, dataSetCopy[_ticker]], axis=1).plot() # plot both stocks
#(spread).plot() # plot the spread
#plt.axhline((spread).mean(), color='red', linestyle='--') # add the mean to the spread plot
obtained += 1
if obtained >= limitResult and limitResult > 0:
break
if obtained >= limitResult and limitResult > 0:
break
return pairs
def retrievePair(tickerA, tickerB, dataSet):
pair = {}
foundPairs = {}
for ticker in dataSet:
if ticker == tickerA:
foundPairs['ticker_a'] = dataSet[ticker]
elif ticker == tickerB:
foundPairs['ticker_b'] = dataSet[ticker]
if len(foundPairs) == 2:
# get linear regressions
A = foundPairs['ticker_a']
B = foundPairs['ticker_b']
A = A.dropna()
B = B.dropna()
A = A.values
B = B.values
if len(A) == len(B):
spread = foundPairs['ticker_a'] - foundPairs['ticker_b']
spread = spread.dropna() # remove all nan-values we have to because otherwise adfuller throws Exception
_spread = foundPairs['ticker_b'] - foundPairs['ticker_a']
_spread = _spread.dropna() # remove all nan-values we have to because otherwise adfuller throws Exception
adfTest = adfuller(spread)
_adfTest = adfuller(_spread)
adfullerScore = min([adfTest[0], _adfTest[0]])
pvalue = adfTest[1]
crit = adfTest[4]
# A depends on B
model = LinearRegression().fit(A.reshape(-1, 1), B)
lrAB = {'intercept': model.intercept_, 'slope': model.coef_[0]}
# B depends on A
model = LinearRegression().fit(B.reshape(-1, 1), A)
lrBA = {'intercept': model.intercept_, 'slope': model.coef_[0]}
# get the most negative adfuller score
if adfTest[0] <= _adfTest[0]:
fullerCombination = 'AB' # A - B
else:
fullerCombination = 'BA' # B - A
pairStr = tickerA + '/' + tickerB
pair = {'pair': pairStr, 'ticker_a': tickerA, 'ticker_b': tickerB, 'series_a': foundPairs['ticker_a'],
'series_b': foundPairs['ticker_b'], 'spread': spread,
'adf_combi': fullerCombination, 'adf_score': adfullerScore,
'ab_intercept': lrAB['intercept'], 'ab_slope': lrAB['slope'],
'ba_intercept': lrBA['intercept'], 'ba_slope': lrBA['slope']}
return pair
# filter stock pairs to meet conditions of correlation and cointegration
# 'dataSet' is set of initial stock pairs it must be a Pandas DataFrame
# 'corrThreshold' is the percent of correlation for filtering, to pass threshold
# 'limitResult' is the number of filtered results we want to obtain. When we have
# this number of filetred pairs the function ceases its runing and returns result.
# If equals -1 then search for all available results.
# 'debug' if True then output all debug related data
res = filterPairs(dataSet=f500_days, corrThreshold=0.70, limitResult=-1, debug=False)
len(res)
n = 0
for obj in res:
print(n, obj['pair'])
n += 1
data = res[51]
#data = retrievePair('AAPL', 'XEL', dataSet=f500_days)
print(data['pair'])
if loadMethod == 'yahoo':
print(' Ticker A: %s (%s/%s)' % ( data['ticker_a'],
tickers_by_industries[data['ticker_a']]['gics'], tickers_by_industries[data['ticker_a']]['gics_sub'] ) )
print(' Ticker B: %s (%s/%s)' % ( data['ticker_b'],
tickers_by_industries[data['ticker_b']]['gics'], tickers_by_industries[data['ticker_b']]['gics_sub'] ) )
elif loadMethod == 'sqlite':
print(' Ticker A: %s (%s)' % (data['ticker_a'], tickers_by_industries[data['ticker_a']]) )
print(' Ticker B: %s (%s)' % (data['ticker_b'], tickers_by_industries[data['ticker_b']]) )
else:
print(' Ticker A: %s' % data['ticker_a'] )
print(' Ticker B: %s' % data['ticker_b'] )
print('')
print('AB intersept: %s' % data['ab_intercept'])
print('AB slope: %s' % data['ab_slope'])
print('')
print('BA intersept: %s' % data['ba_intercept'])
print('BA slope: %s' % data['ba_slope'])
print('')
print('AdFuller the most negative combination: %s' % data['adf_combi'])
print('AdFuller score: %s' % data['adf_score'])
plotZscore(data['spread'], up=2.0, down=-2.0)
plotLinearRegression(data['series_a'], data['series_b'])
plotPriceRatio(data['series_a'], data['series_b'])
pd.concat([data['series_a'], data['series_b']], axis=1).plot() # plot both stocks
data['spread'].plot()
plt.axhline(data['spread'].mean(), color='red', linestyle='--') # add the mean to the spread plot