Hi all
I am bringing my code previously developed and tested in Notebook to IDE. Now backtest starts and after long waiting time I run into TimeOut error for make_pipeline() function on the line with out[:] = . In Research notebook it worked out.
My hunch is that in make_pipeline() it first does regress for all tickers and this computationally complex step takes too long. If I am right maybe I can edit this bit
slope_adj = SlopeAdj(inputs=[USEquityPricing.close], window_length=150)
just to apply to the StaticSids
The StaticSids are used because StaticAssets are not well suited for IDE but works ok in Notebook
The IDE code is below. Any feedback is welcome
from quantopian.algorithm import order_optimal_portfolio
from quantopian.algorithm import attach_pipeline, pipeline_output
from quantopian.pipeline import Pipeline
from quantopian.pipeline.data.builtin import USEquityPricing
import quantopian.optimize as opt
from quantopian.pipeline.factors import CustomFactor
# import StaticAssets
from quantopian.pipeline.filters import StaticAssets, StaticSids
import pandas as pd
import numpy as np
# import what I need for lin regressions
# Import libraries
from statsmodels import regression
import statsmodels.api as sm
import math
# look up all sids and use not StaticAssets but StaticSids to make my_equities object
my_equities = StaticSids([
8554, #SPY
19920, #QQQ
33802, #USRT
28368, #SLV
23870 #IEF
])
def initialize(context):
# Schedule our rebalance function to run at the start of
# each month, when the market opens.
schedule_function(
my_rebalance,
date_rules.month_start(),
time_rules.market_open()
)
# Create our pipeline and attach it to our algorithm.
my_pipe = make_pipeline()
attach_pipeline(my_pipe, 'my_pipeline')
class SlopeAdj(CustomFactor):
# Default inputs
inputs = [USEquityPricing.close]
window_length = 150
# Compute SLope Adj
def compute(self, today, assets, out, close):
#window_length = 150
def linreg(y):
# define X - just index on X axis
x = len(y)
X = range(1, (x + 1), 1)
X = sm.add_constant(X) # aparently needed. adds column of ones to array
# define Y
Y = y.tolist()
flat_list_prices = []
for item in Y:
flat_list_prices.append(item) # don't need the [0]
Y = [math.log(i) for i in flat_list_prices]
# do linear reg
model = regression.linear_model.OLS(Y, X).fit()
slope = model.params[1] # this is how to access b slope parameter in the model object
return slope
out[:] = np.apply_along_axis(linreg, 0, close)
def make_pipeline():
"""
Create our pipeline.
"""
slope_adj = SlopeAdj(inputs=[USEquityPricing.close], window_length=150)
positive_slope = slope_adj > 0
is_tradable = positive_slope & my_equities
slope_adj_top_2 = slope_adj.top(2, mask=is_tradable)
longs = slope_adj_top_2
return Pipeline(
columns={
'slope_adj': slope_adj,
'longs': longs},
screen = is_tradable & slope_adj_top_2
)
def compute_target_weights(context, data):
"""
Compute ordering weights.
"""
# Initialize empty target weights dictionary.
# This will map securities to their target weight.
weights = {}
# If there are securities in our longs and shorts lists,
# compute even target weights for each security.
if context.longs:
long_weight = 1.0 / len(context.longs)
else:
return weights
# Exit positions in our portfolio if they are not
# in our longs list
for security in context.portfolio.positions:
if security not in context.longs and data.can_trade(security):
weights[security] = 0
for security in context.longs:
weights[security] = long_weight
return weights
def before_trading_start(context, data):
"""
Get pipeline results.
"""
# Gets our pipeline output every day.
pipe_results = pipeline_output('my_pipeline')
# Go long in securities for which the 'longs' value is True,
# and check if they can be traded.
context.longs = []
for sec in pipe_results[pipe_results['longs']].index.tolist():
if data.can_trade(sec):
context.longs.append(sec)
def my_rebalance(context, data):
"""
Rebalance monthly.
"""
# Calculate target weights to rebalance
target_weights = compute_target_weights(context, data)
# If we have target weights, rebalance our portfolio
if target_weights:
order_optimal_portfolio(
objective=opt.TargetWeights(target_weights),
constraints=[],
)