import numpy as np
import pandas as pd
from scipy import stats
from pytz import timezone
import matplotlib.pyplot as plt
import seaborn as sns
import datetime
import math
from pykalman import KalmanFilter
from pandas.stats.api import ols
from sklearn.decomposition import FastICA, PCA
from statsmodels.stats.stattools import jarque_bera
import functools
# why on earth isn't this allowed??
#import matplotlib.finance.candlestick
today = pd.Timestamp("2015-08-01")
start = pd.Timestamp(today) - pd.Timedelta(days=250)
end = pd.Timestamp(today) + pd.Timedelta(days=7)
# the same number of buckets as
EquivalentMinutesPerBucket = 15
BucketsPerDay = 390/EquivalentMinutesPerBucket
# get data
syms = ["SPY","DIA","QQQ"]
opens = get_pricing(symbols(syms), frequency='minute',
start_date=start, end_date=end, fields='open_price').dropna()
highs = get_pricing(symbols(syms), frequency='minute',
start_date=start, end_date=end, fields='high').dropna()
lows = get_pricing(symbols(syms), frequency='minute',
start_date=start, end_date=end, fields='low').dropna()
closes = get_pricing(symbols(syms), frequency='minute',
start_date=start, end_date=end, fields='close_price').dropna()
volumes = get_pricing(symbols(syms), frequency='minute',
start_date=start, end_date=end, fields='volume').dropna()
opens.index = opens.index.map(lambda x: x.tz_convert("US/Eastern"))
highs.index = highs.index.map(lambda x: x.tz_convert("US/Eastern"))
lows.index = lows.index.map(lambda x: x.tz_convert("US/Eastern"))
closes.index = closes.index.map(lambda x: x.tz_convert("US/Eastern"))
volumes.index = volumes.index.map(lambda x: x.tz_convert("US/Eastern"))
# motivation - resampled series in time still have a tricky distribution due to heteroskedasticity
# use close-open returns to avoid overnights, a whole other problem
# resampling to N minutes since we ultimately will be making equivolume buckets per day
freq = str(EquivalentMinutesPerBucket)+"T"
o_resamp = opens.resample(freq, how="first").dropna()
c_resamp = closes.resample(freq, how="last").dropna()
rets = (np.log(c_resamp) - np.log(o_resamp)).fillna(0)
rets.apply(lambda x: sns.distplot(x,kde=False,fit=stats.norm))
def test_rets_jarque_bera(rets):
name = ['Jarque-Bera', 'Chi^2 two-tail prob.', 'Skew', 'Kurtosis']
test_results = jarque_bera(rets)
for i in range(0,len(rets.columns)):
sym = rets.columns[i].symbol
for j in range(0,len(name)):
print("Sym: %s Test: %s, Value: %f" % (sym,name[j],test_results[j][i]))
test_rets_jarque_bera(rets)
# calculate cross-sectional volume profile
def vol_profile(vdf):
return (vdf / vdf.groupby(lambda x: x.date).transform(np.sum)).groupby(lambda x: x.time).mean()
# we use only volumes from start->today, to see how the bucketing works out-of-sample
vp = vol_profile(volumes[start:today])
vp.plot(title="Volume profiles")
# show how the default time-wise aggregation of bars leads to bars with wildly different activity
# (contributing to the the leptokurtotic distribution above)
def resample_barcount(df,agg):
work = df
work['bar'] = work.groupby(lambda x: x.date).cumcount().apply(lambda x: x / EquivalentMinutesPerBucket)
avg_timebucket_vols = work.groupby(lambda x: x.date).apply(lambda day: day.groupby('bar').apply(agg))
avg_timebucket_vols = avg_timebucket_vols.drop('bar', axis=1)
return avg_timebucket_vols
resample_barcount(volumes,sum).groupby(lambda x: x[1]).mean().plot(title="Unadjusted" + str(EquivalentMinutesPerBucket)+"-minute buckets")
# here we aggregate all the volume profiles into one. theoretically, we could bucket each stock
# separately, but it would make for different bucket edges, which means that the resulting bucketed
# time series would no longer be homogenous (across stocks) in time.
vp = vp.mean(axis=1)
bucket_count = BucketsPerDay
bucket_quantiles = np.arange(0,1.0,1.0/bucket_count)
# calculate our intraday buckets
buckets = pd.Series(np.digitize(vp.cumsum(),bucket_quantiles), index=vp.index)
buckets.groupby(buckets).count().plot(title="Minutes per bucket")
# calculate the bucket edges, might be helpful when converting to algo - stop in handle_data
# at these times to do stuff
bucket_edges = buckets.diff().fillna(0)
# this is a function to resample the dataframe given some intraday bucketing
def resample_buckets(df, buckets, agg):
# don't know a cleaner way of doing this
df['time'] = df.index.map(lambda x: x.time)
df = pd.merge(left=df,left_on='time',right=pd.DataFrame({'bucket':buckets}),right_index=True)
df = df.drop('time',axis=1)
result = df.groupby(lambda x: x.date).apply(lambda df: df.groupby('bucket').apply(agg))
result = result.drop('bucket', axis=1)
return result
bucketed_volume = resample_buckets(volumes, buckets, lambda x: x.sum())
bucketed_open = resample_buckets(opens, buckets, lambda x: x.iloc[0])
bucketed_high = resample_buckets(highs, buckets, lambda x: x.max())
bucketed_low = resample_buckets(lows, buckets, lambda x: x.min())
bucketed_close = resample_buckets(closes, buckets, lambda x: x.iloc[-1])
# we seem to have generally gotten the intraday trend out of there
bucketed_volume[start.date():today.date()].groupby(lambda x: x[1]).mean().plot(title="In-sample Mean volume per bucket")
bucketed_volume[today.date():end.date()].groupby(lambda x: x[1]).mean().plot(title="Out-of-sample Mean volume per bucket")
# finally, lets calculate our equivolume bucket open->close returns to check for better normality
bucket_rets = (np.log(bucketed_close) - np.log(bucketed_open)).fillna(0)
#print bucket_rets
bucket_rets.apply(lambda x: sns.distplot(x,kde=False,fit=stats.norm))
test_rets_jarque_bera(bucket_rets)
max_hurst_lag=100
def my_hurst(ts):
ts = pd.Series(ts).dropna()
lags = range(2, max_hurst_lag)
# Calculate the array of the variances of the lagged differences
tau = [np.sqrt((ts - ts.shift(-lag)).dropna().std()) for lag in lags]
# Use a linear fit to estimate the Hurst Exponent
x = pd.Series(np.log(lags))
y = pd.Series(np.log(tau))
# need intercept=False due to http://papers.ssrn.com/sol3/papers.cfm?abstract_id=2564916&download=yes
# however, the results really are very poor, at least for intraday data. Assistance welcome.
return pd.ols(x=x,y=y,intercept=True).beta['x']*2
classic_closes = resample_barcount(closes,lambda x: x.iloc[-1])[symbols('SPY')]
equivolume_closes = bucketed_close[symbols('SPY')]
# this is much too slow! look at later
#classic_hursts = pd.rolling_apply(classic_closes, max_hurst_lag, my_hurst, min_periods=max_hurst_lag)
#equivolume_hursts = pd.rolling_apply(equivolume_closes, max_hurst_lag, my_hurst, min_periods=max_hurst_lag)
#cs = pd.Series(classic_hursts)
#es = pd.Series(equivolume_hursts)
#hursts = pd.DataFrame({'classic': cs, 'equivolume': es})
#hursts.plot()
print my_hurst(classic_closes)
print my_hurst(equivolume_closes)
classic_closes.iloc[-100:].plot()
equivolume_closes.iloc[-100:].plot()
# ideally we'd like to plot a candlestick chart of our new time series, to see if it looks a bit better
# behaved than the usual, but that's not allowed, so sorry!