Quantopian's community platform is shutting down. Please read this post for more information and download your code.
Back to Community
make_pipeline() TimeOut error in IDE

Hi all

I am bringing my code previously developed and tested in Notebook to IDE. Now backtest starts and after long waiting time I run into TimeOut error for make_pipeline() function on the line with out[:] = . In Research notebook it worked out.

My hunch is that in make_pipeline() it first does regress for all tickers and this computationally complex step takes too long. If I am right maybe I can edit this bit

slope_adj = SlopeAdj(inputs=[USEquityPricing.close], window_length=150)

just to apply to the StaticSids

The StaticSids are used because StaticAssets are not well suited for IDE but works ok in Notebook

The IDE code is below. Any feedback is welcome

from quantopian.algorithm import order_optimal_portfolio  
from quantopian.algorithm import attach_pipeline, pipeline_output  
from quantopian.pipeline import Pipeline  
from quantopian.pipeline.data.builtin import USEquityPricing  
import quantopian.optimize as opt


from quantopian.pipeline.factors import CustomFactor

# import StaticAssets  
from quantopian.pipeline.filters import  StaticAssets, StaticSids  
import pandas as pd  
import numpy as np 

# import what I need for lin regressions  
# Import libraries  
from statsmodels import regression  
import statsmodels.api as sm  
import math

# look up all sids and use not StaticAssets but StaticSids to make my_equities object  
my_equities = StaticSids([  
     8554, #SPY  
     19920, #QQQ  
     33802, #USRT  
     28368, #SLV  
     23870 #IEF  
])

def initialize(context):  
    # Schedule our rebalance function to run at the start of  
    # each month, when the market opens.  
    schedule_function(  
        my_rebalance,  
        date_rules.month_start(),  
        time_rules.market_open()  
        )

    # Create our pipeline and attach it to our algorithm.  
    my_pipe = make_pipeline()  
    attach_pipeline(my_pipe, 'my_pipeline')  

class SlopeAdj(CustomFactor):  
    # Default inputs  
    inputs = [USEquityPricing.close]  
    window_length = 150

    # Compute SLope Adj  
    def compute(self, today, assets, out, close):  
        #window_length = 150  
        def linreg(y):  
            # define X - just index on X axis  
            x = len(y)  
            X = range(1, (x + 1), 1)  
            X = sm.add_constant(X) # aparently needed. adds column of ones to array  
            # define Y  
            Y = y.tolist()  
            flat_list_prices = []

            for item in Y:  
                flat_list_prices.append(item) # don't need the [0]  
            Y = [math.log(i) for i in flat_list_prices]  
            # do linear reg  
            model = regression.linear_model.OLS(Y, X).fit()  
            slope = model.params[1] # this is how to access b slope parameter in the model object  
            return slope

        out[:] = np.apply_along_axis(linreg, 0, close)  


def make_pipeline():  
    """  
    Create our pipeline.  
    """  
    slope_adj = SlopeAdj(inputs=[USEquityPricing.close], window_length=150)  
    positive_slope = slope_adj > 0  
    is_tradable = positive_slope & my_equities  
    slope_adj_top_2 = slope_adj.top(2, mask=is_tradable)  
    longs = slope_adj_top_2  
    return Pipeline(  
        columns={  
            'slope_adj': slope_adj,  
            'longs': longs},  
        screen = is_tradable & slope_adj_top_2  
        )


def compute_target_weights(context, data):  
    """  
    Compute ordering weights.  
    """

    # Initialize empty target weights dictionary.  
    # This will map securities to their target weight.  
    weights = {}

    # If there are securities in our longs and shorts lists,  
    # compute even target weights for each security.  
    if context.longs:  
        long_weight = 1.0 / len(context.longs)  
    else:  
        return weights

    # Exit positions in our portfolio if they are not  
    # in our longs list  
    for security in context.portfolio.positions:  
        if security not in context.longs and data.can_trade(security):  
            weights[security] = 0

    for security in context.longs:  
        weights[security] = long_weight

    return weights


def before_trading_start(context, data):  
    """  
    Get pipeline results.  
    """

    # Gets our pipeline output every day.  
    pipe_results = pipeline_output('my_pipeline')

    # Go long in securities for which the 'longs' value is True,  
    # and check if they can be traded.  
    context.longs = []  
    for sec in pipe_results[pipe_results['longs']].index.tolist():  
        if data.can_trade(sec):  
            context.longs.append(sec)  




def my_rebalance(context, data):  
    """  
    Rebalance monthly.  
    """

    # Calculate target weights to rebalance  
    target_weights = compute_target_weights(context, data)

    # If we have target weights, rebalance our portfolio  
    if target_weights:  
        order_optimal_portfolio(  
            objective=opt.TargetWeights(target_weights),  
            constraints=[],  
        )  
2 responses

@ Dmitry,

This may help you overcome your problems

I have moved what you are doing in before_trading_start(context, data) every day to my_rebalance(context, data) once a month and have moved pipeline into initialize(context).

# SlopeAdj TAA by Vladimir  
from quantopian.algorithm import attach_pipeline, pipeline_output  
from quantopian.pipeline.data.builtin import USEquityPricing  
from quantopian.pipeline.filters import  StaticAssets  
from quantopian.pipeline.factors import CustomFactor  
from quantopian.pipeline import Pipeline  
import quantopian.optimize as opt  
from statsmodels import regression  
import statsmodels.api as sm  
import numpy as np  
import math  
# -------------------------------------------------------------------------------------------------------  
my_equities = StaticAssets(symbols('SPY','QQQ','IYR','SLV','IEF')); PERIOD = 150; LEV = 1.0; N = 2;  
# -------------------------------------------------------------------------------------------------------  
def initialize(context):  
    schedule_function(my_rebalance, date_rules.month_start(), time_rules.market_open(minutes = 65))  
    m = my_equities  
    slope_adj = SlopeAdj(inputs=[USEquityPricing.close], window_length = PERIOD, mask = m)  
    positive_slope = slope_adj > 0  
    m &= positive_slope  
    longs = slope_adj.top(N, mask = m)  
    m &= longs  
    attach_pipeline(Pipeline({'slope_adj': slope_adj, 'longs': longs}, m), 'my_pipeline')  

def my_rebalance(context, data):  
    output = pipeline_output('my_pipeline')  
    longs = output.index  
    wt = {}; long_weight = LEV / len(longs) if len(longs) > 0 else 0; 

    for sec in context.portfolio.positions:  
        if sec not in longs and data.can_trade(sec):  wt[sec] = 0  
    for sec in longs: wt[sec] = long_weight  
    # for sec, weight in wt.items(): order_target_percent(sec, weight)  
    order_optimal_portfolio(opt.TargetWeights(wt), [opt.MaxGrossExposure(LEV)])  


def before_trading_start(context, data):  
    record(leverage = context.account.leverage, pos_count = len(context.portfolio.positions))  


class SlopeAdj(CustomFactor):  
    inputs = [USEquityPricing.close]  
    window_length = PERIOD

    def compute(self, today, assets, out, close):  
        def linreg(y):  
            x = len(y)  
            X = range(1, (x + 1), 1)  
            X = sm.add_constant(X)  
            Y = y.tolist()  
            flat_list_prices = []

            for item in Y:  
                flat_list_prices.append(item)  
            Y = [math.log(i) for i in flat_list_prices]  
            model = regression.linear_model.OLS(Y, X).fit()  
            slope = model.params[1]  
            return slope

        out[:] = np.apply_along_axis(linreg, 0, close)  

@ Vladimir

Works spot on. Thanks a lot Vladimir!