import numpy as np
import pandas as pd
from scipy import stats
from pytz import timezone
import matplotlib.pyplot as plt
import seaborn as sns
import datetime
import math
from pykalman import KalmanFilter
from pandas.stats.api import ols
from sklearn.decomposition import FastICA, PCA
from statsmodels.stats.stattools import jarque_bera
import functools
# why on earth isn't this allowed??
#import matplotlib.finance.candlestick
today = pd.Timestamp("2009-01-01")
start = pd.Timestamp(today) - pd.Timedelta(days=250)
end = pd.Timestamp(today) + pd.Timedelta(days=60)
# the same number of buckets as
EquivalentMinutesPerBucket = 78
BucketsPerDay = 390/EquivalentMinutesPerBucket
# get data
syms = ["SPY","DIA","QQQ"]
def get_data(syms, start, end):
opens = get_pricing(symbols(syms), frequency='minute',
start_date=start, end_date=end, fields='open_price').dropna()
highs = get_pricing(symbols(syms), frequency='minute',
start_date=start, end_date=end, fields='high').dropna()
lows = get_pricing(symbols(syms), frequency='minute',
start_date=start, end_date=end, fields='low').dropna()
closes = get_pricing(symbols(syms), frequency='minute',
start_date=start, end_date=end, fields='close_price').dropna()
volumes = get_pricing(symbols(syms), frequency='minute',
start_date=start, end_date=end, fields='volume').dropna()
opens.index = opens.index.map(lambda x: x.tz_convert("US/Eastern"))
highs.index = highs.index.map(lambda x: x.tz_convert("US/Eastern"))
lows.index = lows.index.map(lambda x: x.tz_convert("US/Eastern"))
closes.index = closes.index.map(lambda x: x.tz_convert("US/Eastern"))
volumes.index = volumes.index.map(lambda x: x.tz_convert("US/Eastern"))
return opens, highs, lows, closes, volumes
opens, highs, lows, closes, volumes = get_data(syms, start, end)
# motivation - resampled series in time still have a tricky distribution due to heteroskedasticity
# resampling to N minutes since we ultimately will be making equivariance buckets per day
freq = str(EquivalentMinutesPerBucket)+"T"
c_resamp = closes[start:today].resample(freq, how="last").dropna()
o_resamp = opens[start:today].resample(freq, how="first").dropna()
rets = np.log(c_resamp).diff().fillna(0)
rets.apply(lambda x: sns.distplot(x,kde=False,fit=stats.norm))
def test_rets_jarque_bera(rets):
name = ['Jarque-Bera', 'Chi^2 two-tail prob.', 'Skew', 'Kurtosis']
test_results = jarque_bera(rets)
for i in range(0,len(rets.columns)):
sym = rets.columns[i].symbol
for j in range(0,len(name)):
print("Sym: %s Test: %s, Value: %f" % (sym,name[j],test_results[j][i]))
test_rets_jarque_bera(rets)
# calculate cross-sectional profile
def x_profile(vdf):
return (vdf / vdf.groupby(lambda x: x.date).transform(np.sum)).groupby(lambda x: x.time).mean()
# we use only volumes from start->today, to see how the bucketing works out-of-sample
#rets = (closes-opens)/opens
rets = np.log(closes).diff().fillna(0)
# ASSUMING A MEAN RETURN OF 0
variances = rets*rets
vp = x_profile(variances[start:today])
vp.plot(title="Variance profiles")
vp.cumsum().plot(title="Cum. Variance")
# show how the default time-wise aggregation of bars leads to bars with wildly different activity
# (contributing to the the leptokurtotic distribution above)
def resample_barcount(df,minutes_per_bar,agg):
work = df.copy()
work['bar'] = work.groupby(lambda x: x.date).cumcount().apply(lambda x: x / minutes_per_bar)
avg_timebucket_vols = work.groupby(lambda x: x.date).apply(lambda day: day.groupby('bar').apply(agg))
avg_timebucket_vols = avg_timebucket_vols.drop('bar', axis=1)
return avg_timebucket_vols
resample_barcount(variances,EquivalentMinutesPerBucket,sum).groupby(lambda x: x[1]).mean().plot(title="Unadjusted " + str(EquivalentMinutesPerBucket)+"-minute buckets")
# here we aggregate all the volume profiles into one. theoretically, we could bucket each stock
# separately, but it would make for different bucket edges, which means that the resulting bucketed
# time series would no longer be homogenous (across stocks) in time.
vp = vp.mean(axis=1)
def calc_buckets(x, bucket_count):
# vp counts up from ~0.0 -> ~1.0
vp_raw = x_profile(x).mean(axis=1)
vp_cumsum = vp_raw.cumsum()
vp_cumsum[vp_cumsum>1.0] = 1.0 # kind of a hack, but we have numerical errors leading to >1
vp = vp_cumsum.diff()
vp.iloc[0] = vp_raw.iloc[0]
print "First bar: " + str(vp_raw.iloc[0])
bucket_target = 1.0/bucket_count
print "Bucket target: " + str(bucket_target)
current_bucket = 0
current_bucket_sum = 0
cum_sum = 0.0
buckets = []
# would rather this were vectorized, but I can't think of how to do it.
# at least it's just 0->390
for x,xs in zip(vp,vp_cumsum):
current_bucket_sum = current_bucket_sum + x
buckets.append(current_bucket)
if (current_bucket_sum > bucket_target):
current_bucket = current_bucket + 1
current_bucket_sum = 0.0
bucket_target = (1.0 - xs) / (bucket_count - current_bucket)
buckets = pd.Series(buckets, index=vp.index)
return buckets
buckets = calc_buckets(variances[start:end], BucketsPerDay)
minutes_per_bucket = buckets.groupby(buckets).count()
print minutes_per_bucket
minutes_per_bucket.plot(title="Minutes per bucket")
def d_and_t_to_dt(d, t):
return pd.Timestamp(datetime.datetime(d.year, d.month, d.day, t.hour, t.minute, t.second)).tz_localize("US/Eastern")
# this is a function to resample the dataframe given some intraday bucketing
def resample_buckets(df, buckets, agg):
# calculate the bucket edges, might be helpful when converting to algo - stop in handle_data
# at these times to do stuff
bucket_edges = buckets.diff().shift(-1).fillna(1)
bucket_edges = bucket_edges[bucket_edges>0].cumsum()
df = df.copy()
# don't know a cleaner way of doing this
df['time'] = df.index.map(lambda x: x.time)
df = pd.merge(left=df,left_on='time',right=pd.DataFrame({'bucket':buckets}),right_index=True)
df = df.drop('time',axis=1)
result = df.groupby(lambda x: x.date).apply(lambda df: df.groupby('bucket').apply(agg))
result = result.drop('bucket', axis=1)
# here, the index is a multilevel index. change it back to regular (inhomogeneous) DatetimeIndex
result.index = result.index.set_levels([result.index.levels[0],bucket_edges.index]).map(lambda x: d_and_t_to_dt(x[0],x[1]))
return result
bucketed_variance = resample_buckets(variances, buckets, lambda x: x.sum())
bucketed_volume = resample_buckets(volumes, buckets, lambda x: x.sum())
bucketed_open = resample_buckets(opens, buckets, lambda x: x.iloc[0])
bucketed_high = resample_buckets(highs, buckets, lambda x: x.max())
bucketed_low = resample_buckets(lows, buckets, lambda x: x.min())
bucketed_close = resample_buckets(closes, buckets, lambda x: x.iloc[-1])
def calc_bucketed_vwap(opens,closes,volumes):
oc_mid = (opens + closes) / 2.0
weighted_mid = oc_mid * volumes
bucketed_weighted_mid_sum = resample_buckets(weighted_mid, buckets, lambda x: x.sum())
bucketed_volume = resample_buckets(volumes, buckets, lambda x: x.sum())
vwap = bucketed_weighted_mid_sum / bucketed_volume
return vwap
bucketed_vwap = calc_bucketed_vwap(opens, closes, volumes)
# we seem to have generally gotten the intraday trend out of there
bucketed_variance[start.date():today.date()].groupby(lambda x: x.time).mean().plot(title="In-sample Mean variance per bucket")
bucketed_variance[today.date():end.date()].groupby(lambda x: x.time).mean().plot(title="Out-of-sample Mean variance per bucket")
# finally, lets calculate our equivolume bucketed vwap
bucket_rets = np.log(bucketed_close).diff().fillna(0)
def draw_dist(x,s):
fig = plt.figure()
fig.suptitle(s)
sns.distplot(x,kde=False,fit=stats.norm)
for s in syms:
srets = bucket_rets[symbols(s)].groupby(lambda x: x.time).apply(lambda x: draw_dist(x,s))
#print bucket_rets
#bucket_rets.apply(lambda x: sns.distplot(x,kde=False,fit=stats.norm))
#test_rets_jarque_bera(bucket_rets)
max_hurst_lag=50
def my_hurst(ts,intercept):
ts = pd.Series(ts).dropna()
lags = range(2, max_hurst_lag)
# Calculate the array of the variances of the lagged differences
tau = [np.sqrt((ts - ts.shift(-lag)).dropna().std()) for lag in lags]
# Use a linear fit to estimate the Hurst Exponent
x = pd.Series(np.log(lags))
y = pd.Series(np.log(tau))
# need intercept=False due to http://papers.ssrn.com/sol3/papers.cfm?abstract_id=2564916&download=yes
# however, the results really are very poor, at least for intraday data. Assistance welcome.
return pd.ols(x=x,y=y,intercept=intercept).beta['x']*2