Notebook

Gaussian Copula Conditionals

Given a Gaussian copula $C(u),\,u\in[0,1]^d$ with correlation matrix $S$. Denote $I\subseteq\{1,\dots,d\}$ a subset of coordinate indices, and $J=\{1,\dots,d\}\setminus I$ its complement. Let $U$ be the random vector posessing joint CDF $C$, and $U_I,U_J$ denote its subvectors with corresponding components.

Denote $X=(X_1,\dots,X_d)$ the random vector obtained from $U$ using inverse to the standard normal CDF $\Phi^{-1}$ as follows:

$$ X=\left(\Phi^{-1}\left(U_1\right),\dots,\Phi^{-1}\left(U_d\right)\right). $$

It is well known that $X$ possesses joint normal distribution with zero mean and covariance matrix $S$.

Conditional normal distribution

Now let $X_I, X_J$ be subvectors of $X$ corresponding to subsets of indices $I, J$. What is the distribution of $X_J$ given $X_I=a$, where $a\in R^{\,|I|}$?

Denote $S*{IJ}$ the submatrix of $S$ with rows from $I$ and columns from $J$, and similarly $S*{II}$, $S*{JJ}$ and $S*{JI}$. If all the numbers in $I$ precede the numbers in $J$ (in general it is not so), then we would have

$$ S=\left( \begin{array}{cc} S*{II}&S*{IJ}\\ S*{JI}&S*{JJ} \end{array} \right). $$

Note that since $S$ is symmetric, we have $S*{IJ}=S*{JI}'$.

Now the conditional distribution of $X_J$ given $X_I=a$ is joint normal with mean and covariance matrix

$$ \mu(a)=E\left(X_J\left|X*I=a\right.\right)=S*{JI}S_{II}^{-1}a,\;\;\; Z=E\left((X_J-\mu(a))^2\left|X*I=a\right.\right)=S*{JJ}-S*{JI}S*{II}^{-1}S_{IJ}. $$

Denote CDF of the distribution by $F*{\mu(a),Z}$, and the corresponding PDF by $f*{\mu(a),Z}$.

Conditional copula distribution

Given a copula $C$, a joint CDF for a random vector $U$ with correlation matrix $S$. Let $U_I=b$, what is the conditional distribution of $U_j$ given $U_I=b$?

Denote $X=\Phi^{-1}(U)$, $X_I=\Phi^{-1}\left(U_I\right)$, $X_J=\Phi^{-1}\left(U_J\right)$, where the function $\Phi^{-1}$ is applied component-wise. Let $a=\Phi^{-1}(b)$, then we have for the conditional CDF of $U_J$ given $U_I=b$:

$$ P\left(U_J\leq u_J\left|U_I=b\right.\right)=P\left(\Phi^{-1}(U_J)\leq \Phi^{-1}(u_J)\left|\Phi^{-1}(U_I)=\Phi^{-1}(b)\right.\right)=P\left(X_J\leq \Phi^{-1}(u_J)\left|X_I=\Phi^{-1}(b)\right.\right). $$

The latter is joint normal cumulative distribution function with mean $S*{JI}S*{II}^{-1}\Phi^{-1}(b)$ and covariance matrix $S*{JJ}-S*{JI}S*{II}^{-1}S*{IJ}$, possessing PDF $f*{\mu\left(\Phi^{-1}(b)\right),\,Z}$ and CDF $F*{\mu\left(\Phi^{-1}(b)\right),\,Z}$.

Code

In [37]:
"""
Created on Sun Oct 22 14:16:36 2017

@author: Arcady Novosyolov
"""

import numpy as np
import pandas as pd
import scipy.stats as ss
from sklearn.svm import SVR

# avioding boundary trouble
loc_eps = 0.0000001


# ================================================================
# copula classes
# ================================================================


class GaussianCopulaConditional:
    """ """
    def __init__(self, corr_matrix=np.eye(2), conditionals=None):
        """
        Gaussian covariance matrix and variables variables list;
        target variables will be kept in 'targets' list
        """
        self.cr = pd.DataFrame(corr_matrix)
        cls = self.cr.columns

        # by default conditionals contains everything except the last component
        if conditionals is None:
            conditionals = cls[:-1]

        # the rest of components are targets
        targets = [s for s in cls if s not in conditionals]

        # keep the variables lists
        self.targets = targets
        self.conditionals = conditionals

        # covariance submatrices
        sII = np.array(self.cr.loc[conditionals, conditionals])
        sIJ = np.array(self.cr.loc[conditionals, targets])
        sJI = np.array(self.cr.loc[targets, conditionals])
        sJJ = np.array(self.cr.loc[targets, targets])

        # conditional mean and covariance
        pr = np.linalg.solve(sII, sIJ)
        self.cond_cov = sJJ - sJI.dot(pr)
        self.mupr = np.linalg.solve(sII, sIJ).T

    def make_input(self, u, u_cond):
        """ """
        if u is None:
            u = 0.5 * np.ones((1, len(self.targets)))
        if u_cond is None:
            u_cond = 0.5 * np.ones((1, len(self.conditionals)))
        u[u == 0] = loc_eps
        u[u == 1] = 1 - loc_eps
        return u, u_cond

    def pdf(self, u=None, u_cond=None):
        """
        u is 2-dim array n_obs x n_targets
        u_cond is 2-dim array 1 x n_conditionals
        """
        u, u_cond = self.make_input(u, u_cond)
        x_cond = ss.norm.ppf(u_cond)
        x = ss.norm.ppf(u)
        mn = self.mupr.dot(x_cond.T).T[0]
        if len(self.targets) <= 1:
            res = ss.norm.pdf(x, mn[0], self.cond_cov[0, 0])
            fx = ss.norm.pdf(x)
            res = pd.Series((res / fx)[:, 0], index=u[:, 0])
        else:
            res = ss.multivariate_normal.pdf(x, mean=mn, cov=self.cond_cov)
            fx = ss.norm.pdf(x).prod(axis=1)
            res = pd.Series(res / fx, index=range(x.shape[0]))
        res.name = 'Cond PDF of ' + ', '.join(self.targets)
        return res

    def cdf(self, u=None, u_cond=None):
        """
        u is 2-dim array n_obs x n_targets
        u_cond is 2-dim array 1 x n_conditionals
        """
        u, u_cond = self.make_input(u, u_cond)
        x_cond = ss.norm.ppf(u_cond)
        x = ss.norm.ppf(u)
        mn = self.mupr.dot(x_cond.T).T[0]
        if len(self.targets) <= 1:
            # univariate normal
            res = ss.norm.cdf(x, mn[0], self.cond_cov[0, 0])
            res = pd.Series(res[:, 0], index=u[:, 0])
        else:
            # mvnormcdf does not accept multiple input points
            res = np.zeros(x.shape[0])
            for i in range(x.shape[0]):
                res[i] = mvnormcdf(x[i, :], mn, self.cond_cov)
            res = pd.Series(res, index=range(u.shape[0]))
        res.name = 'Cond CDF of ' + ', '.join(self.targets)
        return res


class GaussianCopula:
    """ """

    def __init__(self, corr_matr=pd.DataFrame(np.eye(2))):
        """
        initialize copula (as two dimensional indipendent one by default)
        """
        self.set_corr(corr_matr)

    def set_corr(self, cr):
        """ set correlation matrix parameter and compute its inverse """
        self.cr = pd.DataFrame(cr)
        self.dim = self.cr.shape[0]
        cls = self.cr.columns
        self.cri = pd.DataFrame(np.linalg.inv(self.cr), columns=cls, index=cls)

    def fit(self, data):
        """
        fit the copula from data
        the data should be 'm times n' Numpy array or Pandas DataFrame,
        m = number of samples (observations), n = number of components
        """
        cr = pd.DataFrame(data).corr(method='spearman')
        self.set_corr(pearson_from_spearman(cr))

    def rvs(self, size=100000):
        """
        simulate sample for Monte Carlo method
        size = sample volume, 100,000 by default
        """
        sam = ss.multivariate_normal.rvs(cov=self.cr, size=size)
        return pd.DataFrame(ss.norm.cdf(sam), columns=self.cr.columns)

    def pdf(self, u):
        """ probability density function of the copula """
        x = ss.norm.ppf(u)
        mat = self.cri - np.eye(self.dim)
        pok = -0.5 * x.dot(mat).dot(x)
        mult = 1 / np.sqrt(np.linalg.det(self.cri))
        return mult * np.exp(pok)

    def probability(self, event_func, n_obs=100000):
        """
        calculates probability of an event A, which is defined via its
        indicator function "event_func"
        """
        u = self.rvs(size=n_obs)
        return event_func(u).mean()

    def conditional_probability(self, event_func, cond_func, n_obs=100000):
        """
        calculates conditional probability of an event A, given event B,
        which are defined via there indicator functions "event_func" and
        "cond_func", respectively
        """
        u = self.rvs(size=n_obs)
        prob_of_cond = cond_func(u).mean()
        prob_of_event_and_cond = (event_func(u) & cond_func(u)).mean()
        if prob_of_cond > 0:
            res = prob_of_event_and_cond / prob_of_cond
        else:
            res = 0
        return res

    def conditional_mean_variance_event(self, y_func, cond_func, n_obs=100000):
        """
        calculates conditional mean and variance of a random variable Y = f(U),
        given condition A, with function f defined by "y_func", and
        condition A defined by its indicator function "cond_func"
        """
        u = self.rvs(size=n_obs)
        cond_ser = cond_func(u).astype('float')
        prob_of_cond = cond_ser.mean()
        event_and_cond = (y_func(u) * cond_ser).mean()
        event2_and_cond = (y_func(u) ** 2 * cond_ser).mean()
        if prob_of_cond > 0:
            me = event_and_cond / prob_of_cond
            va = event2_and_cond / prob_of_cond - me ** 2
        else:
            me, va = 0, 0
        return me, va

    def conditional_mean_variance_vector(self, y_func, x_func, n_obs=5000):
        """
        creates an SVR model for computing expected mean of Y given X,
        where X, Y are functions of U;
        returns the model 'mod';
        to predict value of Y in a new point X, call mod.predict(X), as usual
        """
        u = self.rvs(size=n_obs)

        # expected value portion
        y = y_func(u)
        x = x_func(u)
        mod_ev = SVR()
        mod_ev.fit(x, y)
        ym = mod_ev.predict(x)

        # variance portion
        yv = (y - ym) ** 2
        mod_va = SVR()
        mod_va.fit(x, yv)

        return mod_ev, mod_va

# =================================================================
# conversion from Pearson correlations to Spearman correlations
# and vice versa
# =================================================================


def spearman_from_pearson(r):
    """ convertion for normal multivariate """
    s = 6 / np.pi * np.arcsin(0.5 * r)
    return s


def pearson_from_spearman(s):
    """ conversion for normal multivariate """
    r = 2 * np.sin(s * np.pi / 6)
    return r

from scipy.stats import mvn

# =====================================================================
# Gaussian multivariate normal cdf
# =====================================================================

informcode = {0: 'normal completion with ERROR < EPS',
              1: '''completion with ERROR > EPS and MAXPTS function values used;
                    increase MAXPTS to decrease ERROR;''',
              2: 'N > 500 or N < 1'}

def mvstdnormcdf(lower, upper, corrcoef, **kwds):
    n = len(lower)
    lower = np.array(lower)
    upper = np.array(upper)
    corrcoef = np.array(corrcoef)

    correl = np.zeros(int(n*(n-1)/2.0))  #dtype necessary?

    if (lower.ndim != 1) or (upper.ndim != 1):
        raise ValueError('can handle only 1D bounds')
    if len(upper) != n:
        raise ValueError('bounds have different lengths')
    if n==2 and corrcoef.size==1:
        correl = corrcoef
    elif corrcoef.ndim == 1 and len(corrcoef) == n*(n-1)/2.0:
        correl = corrcoef
    elif corrcoef.shape == (n,n):
        correl = corrcoef[np.tril_indices(n, -1)]
    else:
        raise ValueError('corrcoef has incorrect dimension')

    if not 'maxpts' in kwds:
        if n >2:
            kwds['maxpts'] = 10000*n

    lowinf = np.isneginf(lower)
    uppinf = np.isposinf(upper)
    infin = 2.0*np.ones(n)

    np.putmask(infin,lowinf,0)# infin.putmask(0,lowinf)
    np.putmask(infin,uppinf,1) #infin.putmask(1,uppinf)
    #this has to be last
    np.putmask(infin,lowinf*uppinf,-1)

    error, cdfvalue, inform = sp.stats.mvn.mvndst(lower,upper,infin,correl,**kwds)
    if inform:
        print('something wrong', informcode[inform], error)
    return cdfvalue

def mvnormcdf(upper, mu, cov, lower=None,  **kwds):
    upper = np.array(upper)
    if lower is None:
        lower = -np.ones(upper.shape) * np.inf
    else:
        lower = np.array(lower)
    cov = np.array(cov)
    stdev = np.sqrt(np.diag(cov)) # standard deviation vector
    #do I need to make sure stdev is float and not int?
    #is this correct to normalize to corr?
    lower = (lower - mu)/stdev
    upper = (upper - mu)/stdev
    divrow = np.atleast_2d(stdev)
    corr = cov/divrow/divrow.T
    #v/np.sqrt(np.atleast_2d(np.diag(covv)))/np.sqrt(np.atleast_2d(np.diag(covv))).T

    return mvstdnormcdf(lower, upper, corr, **kwds)

Usage

In [38]:
data = np.asarray([['factor', 'first', 'second', 'third', 'fourth'], 
                ['first', 1.00, 0.50, -0.50, 0.50], 
                ['second', 0.50, 1.0, 0.25, 0.00], 
                ['third', -0.50, 0.25, 1.0, -0.25], 
                ['fourth', 0.50, 0.00, -0.25, 1.00]])

s = pd.DataFrame(data=data[1:,1:], index=data[1:,0], columns=data[0,1:])
s = s.astype('float')
print(s)
        first  second  third  fourth
first     1.0    0.50  -0.50    0.50
second    0.5    1.00   0.25    0.00
third    -0.5    0.25   1.00   -0.25
fourth    0.5    0.00  -0.25    1.00

Single target variable

One target variable 'second'.

In [39]:
conditionals = s.columns.delete(1)
print('conditional variables {0}'.format(conditionals))
conditional variables Index([u'first', u'third', u'fourth'], dtype='object')
In [40]:
# create conditionals class
cop = GaussianCopulaConditional(s, conditionals)
In [41]:
# define condition
u_cond = 0.5 * np.ones((len(cop.targets), len(cop.conditionals)))
print(u_cond)
[[ 0.5  0.5  0.5]]

Single input point

In [42]:
# define single point u
u = np.array([[0.2]])
print(u)
[[ 0.2]]
In [43]:
# compute conditional CDF
ccdf = cop.cdf(u, u_cond)
print('---   conditional CDF   ---')
print(ccdf)
---   conditional CDF   ---
0.2    0.005787
Name: Cond CDF of second, dtype: float64
In [44]:
# compute conditional PDF
cpdf = cop.pdf(u, u_cond)
print('---   conditional PDF   ---')
print(cpdf)
---   conditional PDF   ---
0.2    0.176454
Name: Cond PDF of second, dtype: float64

Multiple input points

In [45]:
# define points u
u = np.array([[0.2], [0.5], [0.8]])
print(u)
[[ 0.2]
 [ 0.5]
 [ 0.8]]
In [46]:
# compute conditional CDF
ccdf = cop.cdf(u, u_cond)
print('---   conditional CDF   ---')
print(ccdf)
---   conditional CDF   ---
0.2    0.005787
0.5    0.500000
0.8    0.994213
Name: Cond CDF of second, dtype: float64
In [47]:
# compute conditional PDF
cpdf = cop.pdf(u, u_cond)
print('---   conditional PDF   ---')
print(cpdf)
---   conditional PDF   ---
0.2    0.176454
0.5    3.000000
0.8    0.176454
Name: Cond PDF of second, dtype: float64