Notebook
In [138]:
import numpy as np
import pandas as pd
from scipy import stats
from pytz import timezone
import matplotlib.pyplot as plt
import seaborn as sns
import datetime
import math
from pykalman import KalmanFilter
from pandas.stats.api import ols
from sklearn.decomposition import FastICA, PCA
from statsmodels.stats.stattools import jarque_bera
import functools
# why on earth isn't this allowed??
#import matplotlib.finance.candlestick

today = pd.Timestamp("2009-01-01")
start = pd.Timestamp(today) - pd.Timedelta(days=250)
end = pd.Timestamp(today) + pd.Timedelta(days=60)

# the same number of buckets as
EquivalentMinutesPerBucket = 78
BucketsPerDay = 390/EquivalentMinutesPerBucket
In [71]:
# get data
syms = ["SPY","DIA","QQQ"]

def get_data(syms, start, end):
    opens = get_pricing(symbols(syms), frequency='minute',
                        start_date=start, end_date=end, fields='open_price').dropna()
    highs = get_pricing(symbols(syms), frequency='minute',
                        start_date=start, end_date=end, fields='high').dropna()
    lows = get_pricing(symbols(syms), frequency='minute',
                       start_date=start, end_date=end, fields='low').dropna()
    closes = get_pricing(symbols(syms), frequency='minute',
                         start_date=start, end_date=end, fields='close_price').dropna()
    volumes = get_pricing(symbols(syms), frequency='minute',
                         start_date=start, end_date=end, fields='volume').dropna()
    opens.index = opens.index.map(lambda x: x.tz_convert("US/Eastern"))
    highs.index = highs.index.map(lambda x: x.tz_convert("US/Eastern"))
    lows.index = lows.index.map(lambda x: x.tz_convert("US/Eastern"))
    closes.index = closes.index.map(lambda x: x.tz_convert("US/Eastern"))
    volumes.index = volumes.index.map(lambda x: x.tz_convert("US/Eastern"))
    return opens, highs, lows, closes, volumes

opens, highs, lows, closes, volumes = get_data(syms, start, end)
In [139]:
# motivation - resampled series in time still have a tricky distribution due to heteroskedasticity

# resampling to N minutes since we ultimately will be making equivariance buckets per day
freq = str(EquivalentMinutesPerBucket)+"T"
c_resamp = closes[start:today].resample(freq, how="last").dropna()
o_resamp = opens[start:today].resample(freq, how="first").dropna()

rets = np.log(c_resamp).diff().fillna(0)
rets.apply(lambda x: sns.distplot(x,kde=False,fit=stats.norm))

def test_rets_jarque_bera(rets):
    name = ['Jarque-Bera', 'Chi^2 two-tail prob.', 'Skew', 'Kurtosis']
    test_results = jarque_bera(rets)
    
    for i in range(0,len(rets.columns)):
        sym = rets.columns[i].symbol
        for j in range(0,len(name)):    
            print("Sym: %s Test: %s, Value: %f" % (sym,name[j],test_results[j][i]))
    
test_rets_jarque_bera(rets)
    
Sym: SPY Test: Jarque-Bera, Value: 1698.635386
Sym: SPY Test: Chi^2 two-tail prob., Value: 0.000000
Sym: SPY Test: Skew, Value: 0.158234
Sym: SPY Test: Kurtosis, Value: 9.277181
Sym: DIA Test: Jarque-Bera, Value: 1659.898916
Sym: DIA Test: Chi^2 two-tail prob., Value: 0.000000
Sym: DIA Test: Skew, Value: 0.299320
Sym: DIA Test: Kurtosis, Value: 9.184168
Sym: QQQ Test: Jarque-Bera, Value: 1403.834236
Sym: QQQ Test: Chi^2 two-tail prob., Value: 0.000000
Sym: QQQ Test: Skew, Value: 0.074583
Sym: QQQ Test: Kurtosis, Value: 8.711835
In [140]:
# calculate cross-sectional profile
def x_profile(vdf):
    return (vdf / vdf.groupby(lambda x: x.date).transform(np.sum)).groupby(lambda x: x.time).mean()

# we use only volumes from start->today, to see how the bucketing works out-of-sample
#rets = (closes-opens)/opens
rets = np.log(closes).diff().fillna(0)
# ASSUMING A MEAN RETURN OF 0
variances = rets*rets
vp = x_profile(variances[start:today])
vp.plot(title="Variance profiles")
vp.cumsum().plot(title="Cum. Variance")

# show how the default time-wise aggregation of bars leads to bars with wildly different activity
# (contributing to the the leptokurtotic distribution above)
def resample_barcount(df,minutes_per_bar,agg):
    work = df.copy()
    work['bar'] = work.groupby(lambda x: x.date).cumcount().apply(lambda x: x / minutes_per_bar)
    avg_timebucket_vols = work.groupby(lambda x: x.date).apply(lambda day: day.groupby('bar').apply(agg))
    avg_timebucket_vols = avg_timebucket_vols.drop('bar', axis=1)
    return avg_timebucket_vols

resample_barcount(variances,EquivalentMinutesPerBucket,sum).groupby(lambda x: x[1]).mean().plot(title="Unadjusted " + str(EquivalentMinutesPerBucket)+"-minute buckets")
# here we aggregate all the volume profiles into one.  theoretically, we could bucket each stock
# separately, but it would make for different bucket edges, which means that the resulting bucketed
# time series would no longer be homogenous (across stocks) in time.
vp = vp.mean(axis=1)
In [141]:
def calc_buckets(x, bucket_count):
    # vp counts up from ~0.0 -> ~1.0
    vp_raw = x_profile(x).mean(axis=1)
    vp_cumsum = vp_raw.cumsum()
    vp_cumsum[vp_cumsum>1.0] = 1.0 # kind of a hack, but we have numerical errors leading to >1
    vp = vp_cumsum.diff()
    vp.iloc[0] = vp_raw.iloc[0]
    print "First bar: " + str(vp_raw.iloc[0])

    bucket_target = 1.0/bucket_count
    print "Bucket target: " + str(bucket_target)
    current_bucket = 0
    current_bucket_sum = 0
    cum_sum = 0.0
    buckets = []
    # would rather this were vectorized, but I can't think of how to do it.
    # at least it's just 0->390
    for x,xs in zip(vp,vp_cumsum):
        current_bucket_sum = current_bucket_sum + x
        buckets.append(current_bucket)
        if (current_bucket_sum > bucket_target):
            current_bucket = current_bucket + 1
            current_bucket_sum = 0.0
            bucket_target = (1.0 - xs) / (bucket_count - current_bucket)
            
    buckets = pd.Series(buckets, index=vp.index)
    return buckets

buckets = calc_buckets(variances[start:end], BucketsPerDay)
minutes_per_bucket = buckets.groupby(buckets).count()
print minutes_per_bucket
minutes_per_bucket.plot(title="Minutes per bucket")

def d_and_t_to_dt(d, t):
    return pd.Timestamp(datetime.datetime(d.year, d.month, d.day, t.hour, t.minute, t.second)).tz_localize("US/Eastern")

# this is a function to resample the dataframe given some intraday bucketing
def resample_buckets(df, buckets, agg):
    # calculate the bucket edges, might be helpful when converting to algo - stop in handle_data
    # at these times to do stuff
    bucket_edges = buckets.diff().shift(-1).fillna(1)
    bucket_edges = bucket_edges[bucket_edges>0].cumsum()
    df = df.copy()
    # don't know a cleaner way of doing this
    df['time'] = df.index.map(lambda x: x.time)
    df = pd.merge(left=df,left_on='time',right=pd.DataFrame({'bucket':buckets}),right_index=True)
    df = df.drop('time',axis=1)
    result = df.groupby(lambda x: x.date).apply(lambda df: df.groupby('bucket').apply(agg))
    result = result.drop('bucket', axis=1)
    # here, the index is a multilevel index.  change it back to regular (inhomogeneous) DatetimeIndex
    result.index = result.index.set_levels([result.index.levels[0],bucket_edges.index]).map(lambda x: d_and_t_to_dt(x[0],x[1]))
    return result
First bar: 0.186070064653
Bucket target: 0.2
0      5
1     69
2    132
3    111
4     73
dtype: int64
In [142]:
bucketed_variance = resample_buckets(variances, buckets, lambda x: x.sum())
bucketed_volume = resample_buckets(volumes, buckets, lambda x: x.sum())
bucketed_open = resample_buckets(opens, buckets, lambda x: x.iloc[0])
bucketed_high = resample_buckets(highs, buckets, lambda x: x.max())
bucketed_low = resample_buckets(lows, buckets, lambda x: x.min())
bucketed_close = resample_buckets(closes, buckets, lambda x: x.iloc[-1])

def calc_bucketed_vwap(opens,closes,volumes): 
    oc_mid = (opens + closes) / 2.0
    weighted_mid = oc_mid * volumes
    bucketed_weighted_mid_sum = resample_buckets(weighted_mid, buckets, lambda x: x.sum())
    bucketed_volume = resample_buckets(volumes, buckets, lambda x: x.sum())
    vwap = bucketed_weighted_mid_sum / bucketed_volume
    return vwap
    
bucketed_vwap = calc_bucketed_vwap(opens, closes, volumes)
In [143]:
# we seem to have generally gotten the intraday trend out of there
bucketed_variance[start.date():today.date()].groupby(lambda x: x.time).mean().plot(title="In-sample Mean variance per bucket")
bucketed_variance[today.date():end.date()].groupby(lambda x: x.time).mean().plot(title="Out-of-sample Mean variance per bucket")
Out[143]:
<matplotlib.axes._subplots.AxesSubplot at 0x7fa7656fcbd0>
In [159]:
# finally, lets calculate our equivolume bucketed vwap
bucket_rets = np.log(bucketed_close).diff().fillna(0)

def draw_dist(x,s):
    fig = plt.figure()
    fig.suptitle(s)
    sns.distplot(x,kde=False,fit=stats.norm)

for s in syms:
    srets = bucket_rets[symbols(s)].groupby(lambda x: x.time).apply(lambda x: draw_dist(x,s))

#print bucket_rets
#bucket_rets.apply(lambda x: sns.distplot(x,kde=False,fit=stats.norm))
#test_rets_jarque_bera(bucket_rets)
In [160]:
max_hurst_lag=50
def my_hurst(ts,intercept):
    ts = pd.Series(ts).dropna()
    lags = range(2, max_hurst_lag) 
    # Calculate the array of the variances of the lagged differences
    tau = [np.sqrt((ts - ts.shift(-lag)).dropna().std()) for lag in lags]
    # Use a linear fit to estimate the Hurst Exponent
    x = pd.Series(np.log(lags))
    y = pd.Series(np.log(tau))
    # need intercept=False due to http://papers.ssrn.com/sol3/papers.cfm?abstract_id=2564916&download=yes
    # however, the results really are very poor, at least for intraday data.  Assistance welcome.
    return pd.ols(x=x,y=y,intercept=intercept).beta['x']*2
In [ ]: