Quantopian's community platform is shutting down. Please read this post for more information and download your code.
Back to Community
Gaussian Conditional Copula

In the spirit of sharing on Q forums, I am posting my code for Gaussian Copula class that you can use for pair trading or basket trading.

The gist of using copulas is that you identify the conditional cdf of a series based on other series and build a score around it. There are many references to it if you search for copula based pairs trading on google. Happy researching :)

def cov2corr( A ):  
    d = np.sqrt(A.diagonal())  
    A = ((A.T/d).T)/d  
    return A

from sklearn.svm import SVR  
from scipy.stats import mvn  
#from statsmodels.sandbox.distributions.extras import mvnormcdf

# avioding boundary trouble  
loc_eps = 0.0000001


# ================================================================  
# copula classes  
# ================================================================


class GaussianCopulaConditional:  
    """ """

    def __init__(self, corr_matrix=np.eye(2), conditionals=None):  
        """  
        Gaussian covariance matrix and variables variables list;  
        target variables will be kept in 'targets' list  
        """  
        self.cr = pd.DataFrame(corr_matrix)  
        cls = self.cr.columns.values

        # by default conditionals contains everything except the last component  
        if conditionals is None:  
            conditionals = cls[:-1]

        # the rest of components are targets  
        targets = [s for s in cls if s not in conditionals]

        # keep the variables lists  
        self.targets = targets  
        self.conditionals = conditionals

        # covariance submatrices  
        sII = np.array(self.cr.loc[conditionals, conditionals])  
        sIJ = np.array(self.cr.loc[conditionals, targets])  
        sJI = np.array(self.cr.loc[targets, conditionals])  
        sJJ = np.array(self.cr.loc[targets, targets])

        # conditional mean and covariance  
        pr = np.linalg.solve(sII, sIJ)  
        self.cond_cov = sJJ - sJI.dot(pr)  
        self.mupr = np.linalg.solve(sII, sIJ).T

    def make_input(self, u, u_cond):  
        """ """  
        if u is None:  
            u = 0.5 * np.ones((1, len(self.targets)))  
        if u_cond is None:  
            u_cond = 0.5 * np.ones((1, len(self.conditionals)))  
        u[u == 0] = loc_eps  
        u[u == 1] = 1 - loc_eps  
        return u, u_cond

    def pdf(self, u=None, u_cond=None):  
        """  
        u is 2-dim array n_obs x n_targets  
        u_cond is 2-dim array 1 x n_conditionals  
        """  
        u, u_cond = self.make_input(u, u_cond)  
        x_cond = ss.norm.ppf(u_cond)  
        x = ss.norm.ppf(u)  
        mn = self.mupr.dot(x_cond.T).T[0]  
        if len(self.targets) <= 1:  
            res = ss.norm.pdf(x, mn[0], self.cond_cov[0, 0])  
            fx = ss.norm.pdf(x)  
            res = pd.Series((res / fx)[:, 0], index=u[:, 0])  
        else:  
            res = ss.multivariate_normal.pdf(x, mean=mn, cov=self.cond_cov)  
            fx = ss.norm.pdf(x).prod(axis=1)  
            res = pd.Series(res / fx, index=range(x.shape[0]))  
        res.name = 'Cond PDF of ' + ', '.join(self.targets)  
        return res

    def cdf(self, u=None, u_cond=None):  
        """  
        u is 2-dim array n_obs x n_targets  
        u_cond is 2-dim array 1 x n_conditionals  
        """  
        u, u_cond = self.make_input(u, u_cond)  
        x_cond = ss.norm.ppf(u_cond)  
        x = ss.norm.ppf(u)  
        mn = self.mupr.dot(x_cond.T).T[0]  
        if len(self.targets) <= 1:  
            # univariate normal  
            res = ss.norm.cdf(x, mn[0], self.cond_cov[0, 0])[0,0]  
        else:  
            # mvnormcdf does not accept multiple input points  
            res = np.zeros(x.shape[0])  
            for i in range(x.shape[0]):  
                res[i] = mvnormcdf(x[i, :], mn, self.cond_cov)  
        return res


class GaussianCopula:  
    """ """

    def __init__(self, corr_matr=pd.DataFrame(np.eye(2))):  
        """  
        initialize copula (as two dimensional indipendent one by default)  
        """  
        self.set_corr(corr_matr)

    def set_corr(self, cr):  
        """ set correlation matrix parameter and compute its inverse """  
        self.cr = pd.DataFrame(cr)  
        self.dim = self.cr.shape[0]  
        cls = self.cr.columns  
        self.cri = pd.DataFrame(np.linalg.inv(self.cr), columns=cls, index=cls)

    def fit(self, data):  
        """  
        fit the copula from data  
        the data should be 'm times n' Numpy array or Pandas DataFrame,  
        m = number of samples (observations), n = number of components  
        """  
        cr = pd.DataFrame(data).corr(method='spearman')  
        self.set_corr(pearson_from_spearman(cr))

    def rvs(self, size=100000):  
        """  
        simulate sample for Monte Carlo method  
        size = sample volume, 100,000 by default  
        """  
        sam = ss.multivariate_normal.rvs(cov=self.cr, size=size)  
        return pd.DataFrame(ss.norm.cdf(sam), columns=self.cr.columns)

    def pdf(self, u):  
        """ probability density function of the copula """  
        x = ss.norm.ppf(u)  
        mat = self.cri - np.eye(self.dim)  
        pok = -0.5 * x.dot(mat).dot(x)  
        mult = 1 / np.sqrt(np.linalg.det(self.cri))  
        return mult * np.exp(pok)

    def probability(self, event_func, n_obs=100000):  
        """  
        calculates probability of an event A, which is defined via its  
        indicator function "event_func"  
        """  
        u = self.rvs(size=n_obs)  
        return event_func(u).mean()

    def conditional_probability(self, event_func, cond_func, n_obs=100000):  
        """  
        calculates conditional probability of an event A, given event B,  
        which are defined via there indicator functions "event_func" and  
        "cond_func", respectively  
        """  
        u = self.rvs(size=n_obs)  
        prob_of_cond = cond_func(u).mean()  
        prob_of_event_and_cond = (event_func(u) & cond_func(u)).mean()  
        if prob_of_cond > 0:  
            res = prob_of_event_and_cond / prob_of_cond  
        else:  
            res = 0  
        return res

    def conditional_mean_variance_event(self, y_func, cond_func, n_obs=100000):  
        """  
        calculates conditional mean and variance of a random variable Y = f(U),  
        given condition A, with function f defined by "y_func", and  
        condition A defined by its indicator function "cond_func"  
        """  
        u = self.rvs(size=n_obs)  
        cond_ser = cond_func(u).astype('float')  
        prob_of_cond = cond_ser.mean()  
        event_and_cond = (y_func(u) * cond_ser).mean()  
        event2_and_cond = (y_func(u) ** 2 * cond_ser).mean()  
        if prob_of_cond > 0:  
            me = event_and_cond / prob_of_cond  
            va = event2_and_cond / prob_of_cond - me ** 2  
        else:  
            me, va = 0, 0  
        return me, va

    def conditional_mean_variance_vector(self, y_func, x_func, n_obs=5000):  
        """  
        creates an SVR model for computing expected mean of Y given X,  
        where X, Y are functions of U;  
        returns the model 'mod';  
        to predict value of Y in a new point X, call mod.predict(X), as usual  
        """  
        u = self.rvs(size=n_obs)

        # expected value portion  
        y = y_func(u)  
        x = x_func(u)  
        mod_ev = SVR()  
        mod_ev.fit(x, y)  
        ym = mod_ev.predict(x)

        # variance portion  
        yv = (y - ym) ** 2  
        mod_va = SVR()  
        mod_va.fit(x, yv)

        return mod_ev, mod_va

# =================================================================  
# conversion from Pearson correlations to Spearman correlations  
# and vice versa  
# =================================================================


def spearman_from_pearson(r):  
    """ convertion for normal multivariate """  
    s = 6 / np.pi * np.arcsin(0.5 * r)  
    return s


def pearson_from_spearman(s):  
    """ conversion for normal multivariate """  
    r = 2 * np.sin(s * np.pi / 6)  
    return r


# =====================================================================  
# Gaussian multivariate normal cdf  
# =====================================================================

informcode = {0: 'normal completion with ERROR < EPS',  
              1: '''completion with ERROR > EPS and MAXPTS function values used;  
                    increase MAXPTS to decrease ERROR;''',  
              2: 'N > 500 or N < 1'}

def mvstdnormcdf(lower, upper, corrcoef, **kwds):  
    n = len(lower)  
    lower = np.array(lower)  
    upper = np.array(upper)  
    corrcoef = np.array(corrcoef)

    correl = np.zeros(int(n*(n-1)/2.0))  #dtype necessary?

    if (lower.ndim != 1) or (upper.ndim != 1):  
        raise ValueError('can handle only 1D bounds')  
    if len(upper) != n:  
        raise ValueError('bounds have different lengths')  
    if n==2 and corrcoef.size==1:  
        correl = corrcoef  
    elif corrcoef.ndim == 1 and len(corrcoef) == n*(n-1)/2.0:  
        correl = corrcoef  
    elif corrcoef.shape == (n,n):  
        correl = corrcoef[np.tril_indices(n, -1)]  
    else:  
        raise ValueError('corrcoef has incorrect dimension')

    if not 'maxpts' in kwds:  
        if n >2:  
            kwds['maxpts'] = 10000*n

    lowinf = np.isneginf(lower)  
    uppinf = np.isposinf(upper)  
    infin = 2.0*np.ones(n)

    np.putmask(infin,lowinf,0)# infin.putmask(0,lowinf)  
    np.putmask(infin,uppinf,1) #infin.putmask(1,uppinf)  
    #this has to be last  
    np.putmask(infin,lowinf*uppinf,-1)

    error, cdfvalue, inform = sp.stats.mvn.mvndst(lower,upper,infin,correl,**kwds)  
    if inform:  
        print('something wrong', informcode[inform], error)  
    return cdfvalue

def mvnormcdf(upper, mu, cov, lower=None,  **kwds):  
    upper = np.array(upper)  
    if lower is None:  
        lower = -np.ones(upper.shape) * np.inf  
    else:  
        lower = np.array(lower)  
    cov = np.array(cov)  
    stdev = np.sqrt(np.diag(cov)) # standard deviation vector  
    #do I need to make sure stdev is float and not int?  
    #is this correct to normalize to corr?  
    lower = (lower - mu)/stdev  
    upper = (upper - mu)/stdev  
    divrow = np.atleast_2d(stdev)  
    corr = cov/divrow/divrow.T  
    #v/np.sqrt(np.atleast_2d(np.diag(covv)))/np.sqrt(np.atleast_2d(np.diag(covv))).T

    return mvstdnormcdf(lower, upper, corr, **kwds)  
13 responses

I will put up a notebook later on how to use it.

Attached is a notebook.

Excellent +3 Aqua thanks for sharing!

Very nice work I have to say!
Have you ever tried to use a different family of copulas (like Archimedean) ?

@Magnus, thanks. I haven't tried using Archimedean yet. Will have to figure out how to implement multiple dependence (using Vine Copulas?)

@Aqua, would you recommend any book to come upto speed with the level of Math that seems to be involved in your shared code above?

@Leo, there is not much on Q so use https://www.quantconnect.com/tutorials/pairs-trading-strategy/. Good tutorial in my view.

@Leo, I started learning math several years ago. I still use professional mathematicians to implement my models. So if you have ideas and want a mathematician to develop your model drop me a note and I will put you in touch.

@Aqua, thanks. I want to learn things and come unto speed on my own at the moment, because it is more fun that way. I will check with you if I need professional mathematicians help. My experience is exclusively in software programming, I wish I had done some math college degrees to be able to understand some of the more complicated things that are described in the papers you have shared in the forums.

@Aqua, great information here as always. I must thank you for your public research and algorithms, I have learnt a lot from your posts so credit to you. Do you have a backtest you can post using the Copula Method? I can't seem to get this to work :(

@Leo, first start by learning the math behind mean-reversion of pairs. @MaxMargenot from Q gives a good explanation,
(https://www.youtube.com/watch?v=g-qvFjvyqcs) and Ernest Chan's book on "Algorithmic Trading-winning strategies and their rational" (look at page 87). Then look at @Aqua's notebook.
I hope that helps.

Cheers

@TWI_proptrader, sure. Either post or privately share what you are trying to do and I can put up something to show how to use these classes.

@TWI_proptrader. I am talking about advanced level math in the papers and in Aqua's implementations. The math behind mean reversion of pairs etc that you referred to are just too basic.

Has anybody tried Marshall-Olkin copula, for pairs trading or other trading projects? I am interested in how it performs relative to well-established copula families. 15 years ago, during the credit derivatives boom, there was much talk about the Marshall-Olkin copula making more sense than many other alternatives. But that discussion kind of died out with the recent events.