This is an attempt to document how I tried, and failed to run a pipeline backtest on backtest.
We use the algorithm from https://www.quantopian.com/posts/introducing-the-pipeline-api to demostrate how to run backtest in the environment.
Summary in issues we saw is:
NoEngineRegistered: Attempted to run a pipeline but no pipeline resources were registered.
Looking at https://github.com/quantopian/zipline/blob/master/zipline/pipeline/engine.py#L31 seems like this we need to provide get_pipeline_loader
for the algorithm:
algo_obj = TradingAlgorithm(
initialize=initialize,
handle_data=handle_data,
before_trading_start=before_trading_start,
get_pipeline_loader=???
)
Any way we can provide the pipeline_loader?
# example data input: any way to improve this?
from datetime import datetime
import pytz
start_date = datetime(2015, 1, 1, 0, 0, 0, 0, pytz.utc)
end_date = datetime(2016, 1, 1, 0, 0, 0, 0, pytz.utc)
quantopian.algorithm
into zipline.api
¶The original algorithm was from https://www.quantopian.com/posts/introducing-the-pipeline-api - we changed this so this would
And import other necessary api funtions
"""
This example comes from a request in the forums.
The post can be found here: https://www.quantopian.com/posts/ranking-system-based-on-trading-volume-slash-shares-outstanding
The request was:
I am stuck trying to build a stock ranking system with two signals:
1. Trading Volume/Shares Outstanding.
2. Price of current day / Price of 60 days ago.
Then rank Russell 2000 stocks every month, long the top 5%, short the bottom 5%.
"""
# STEP1: use from zipline.api instead
#from quantopian.algorithm import attach_pipeline, pipeline_output
from zipline.api import schedule_function, date_rules, time_rules, sid, symbol, symbols, \
get_datetime, order_target_percent, record, attach_pipeline, set_commission, \
order_target, pipeline_output
from quantopian.pipeline import Pipeline
from quantopian.pipeline import CustomFactor
from quantopian.pipeline.data.builtin import USEquityPricing
from quantopian.pipeline.data import morningstar
# Create custom factor #1 Trading Volume/Shares Outstanding
class Liquidity(CustomFactor):
# Pre-declare inputs and window_length
inputs = [USEquityPricing.volume, morningstar.valuation.shares_outstanding]
window_length = 1
# Compute factor1 value
def compute(self, today, assets, out, volume, shares):
out[:] = volume[-1]/shares[-1]
# Create custom factor #2 Price of current day / Price of 60 days ago.
class Momentum(CustomFactor):
# Pre-declare inputs and window_length
inputs = [USEquityPricing.close]
window_length = 60
# Compute factor2 value
def compute(self, today, assets, out, close):
out[:] = close[-1]/close[0]
# Create custom factor to calculate a market cap based on yesterday's close
# We'll use this to get the top 2000 stocks by market cap
class MarketCap(CustomFactor):
# Pre-declare inputs and window_length
inputs = [USEquityPricing.close, morningstar.valuation.shares_outstanding]
window_length = 1
# Compute market cap value
def compute(self, today, assets, out, close, shares):
out[:] = close[-1] * shares[-1]
def initialize(context):
pipe = Pipeline()
attach_pipeline(pipe, 'ranked_2000')
# Add the two factors defined to the pipeline
liquidity = Liquidity()
pipe.add(liquidity, 'liquidity')
momentum = Momentum()
pipe.add(momentum, 'momentum')
# Create and apply a filter representing the top 2000 equities by MarketCap every day
# This is an approximation of the Russell 2000
mkt_cap = MarketCap()
top_2000 = mkt_cap.top(2000)
# Rank factor 1 and add the rank to our pipeline
liquidity_rank = liquidity.rank(mask=top_2000)
pipe.add(liquidity_rank, 'liq_rank')
# Rank factor 2 and add the rank to our pipeline
momentum_rank = momentum.rank(mask=top_2000)
pipe.add(momentum_rank, 'mom_rank')
# Take the average of the two factor rankings, add this to the pipeline
combo_raw = (liquidity_rank+momentum_rank)/2
pipe.add(combo_raw, 'combo_raw')
# Rank the combo_raw and add that to the pipeline
pipe.add(combo_raw.rank(mask=top_2000), 'combo_rank')
# Set a screen to ensure that only the top 2000 companies by market cap
# with a momentum factor greater than 0 are returned
pipe.set_screen(top_2000 & (momentum>0))
# Scedule my rebalance function
schedule_function(func=rebalance,
date_rule=date_rules.month_start(days_offset=0),
time_rule=time_rules.market_open(hours=0,minutes=30),
half_days=True)
# Schedule my plotting function
schedule_function(func=record_vars,
date_rule=date_rules.every_day(),
time_rule=time_rules.market_close(),
half_days=True)
# set my leverage
context.long_leverage = 2.00
context.short_leverage = -2.00
def before_trading_start(context, data):
# Call pipelive_output to get the output
context.output = pipeline_output('ranked_2000')
# Narrow down the securities to only the top 200 & update my universe
context.long_list = context.output.sort(['combo_rank'], ascending=False).iloc[:100]
context.short_list = context.output.sort(['combo_rank'], ascending=False).iloc[-100:]
def record_vars(context, data):
# Record and plot the leverage of our portfolio over time.
# record(leverage = context.account.leverage)
print "Long List"
log.info("\n" + str(context.long_list.sort(['combo_rank'], ascending=True).head(10)))
print "Short List"
log.info("\n" + str(context.short_list.sort(['combo_rank'], ascending=True).head(10)))
# This rebalancing is called according to our schedule_function settings.
def rebalance(context,data):
long_weight = context.long_leverage / float(len(context.long_list))
short_weight = context.short_leverage / float(len(context.short_list))
for long_stock in context.long_list.index:
log.info("ordering longs: %s" % (long_stock))
log.info("weight is %s" % (long_weight))
order_target_percent(long_stock, long_weight)
for short_stock in context.short_list.index:
log.info("ordering shorts> %s" % (short_stock))
log.info("weight is %s" % (short_weight))
order_target_percent(short_stock, short_weight)
for stock in context.portfolio.positions.iterkeys():
if stock not in context.long_list.index and stock not in context.short_list.index:
order_target(stock, 0)
# also mock out the Log object
class log(object):
@staticmethod
def info(output):
print output
Redefining initialize - not that we also limit the number of equities that would be added
from zipline.finance import commission
def create_pipeline(top_size=100):
pipe = Pipeline()
# Add the two factors defined to the pipeline
liquidity = Liquidity()
pipe.add(liquidity, 'liquidity')
momentum = Momentum()
pipe.add(momentum, 'momentum')
# Create and apply a filter representing the top 2000 equities by MarketCap every day
# This is an approximation of the Russell 2000
mkt_cap = MarketCap()
top_universe = mkt_cap.top(top_size)
# Rank factor 1 and add the rank to our pipeline
liquidity_rank = liquidity.rank(mask=top_universe)
pipe.add(liquidity_rank, 'liq_rank')
# Rank factor 2 and add the rank to our pipeline
momentum_rank = momentum.rank(mask=top_universe)
pipe.add(momentum_rank, 'mom_rank')
# Take the average of the two factor rankings, add this to the pipeline
combo_raw = (liquidity_rank+momentum_rank)/2
pipe.add(combo_raw, 'combo_raw')
# Rank the combo_raw and add that to the pipeline
pipe.add(combo_raw.rank(mask=top_universe), 'combo_rank')
# Set a screen to ensure that only the top 2000 companies by market cap
# with a momentum factor greater than 0 are returned
pipe.set_screen(top_universe & (momentum>0))
return pipe
# NOTE: redefined from above
def initialize_fixed(context):
pipe = create_pipeline()
attach_pipeline(pipe, 'ranked_2000')
# Scedule my rebalance function
schedule_function(func=rebalance,
date_rule=date_rules.month_start(days_offset=0),
time_rule=time_rules.market_open(hours=0,minutes=30),
half_days=True)
# Schedule my plotting function
schedule_function(func=record_vars,
date_rule=date_rules.every_day(),
time_rule=time_rules.market_close(),
half_days=True)
# set my leverage
context.long_leverage = 1.00
context.short_leverage = -1.00
# see below for fix on pipeline
context.last_pipeline_index = 0
pipe = create_pipeline(top_size=20)
pipe.show_graph(format='png')
from quantopian.research import run_pipeline
pipeline_data = run_pipeline(pipe,
start_date=start_date.isoformat(),
end_date=end_date.isoformat()
)
pipeline_data
Let's get the data we need to run the backtest
# we should narrow down to about top 1000 equities
universe = list(set(pipeline_data.index.get_level_values(1)))
assert len(universe) < 5000, "Universe is too big: {}".format(len(universe))
len(universe)
# ambigious Ambiguous ownership for 1 symbol, multiple assets held the following symbols:
# AGN: intersections: (('2002-01-01 00:00:00', '2015-03-16 00:00:00'),)
EXCLUDE_STOCKS = {} #{'AGN'}
universe = [ s for s in universe if s.symbol not in EXCLUDE_STOCKS ]
# https://www.quantopian.com/help#quantopian_research_get_pricing
data = get_pricing(universe,
start_date=start_date,
end_date=end_date,
#frequency='minute',
symbol_reference_date=start_date,
fields=['price', 'close_price'])
from zipline import TradingAlgorithm
#http://www.zipline.io/appendix.html
algo_obj = TradingAlgorithm(
initialize=initialize_fixed,
#handle_data=handle_data,
before_trading_start=before_trading_start,
start=start_date,
#data_frequency='minute',
end=end_date,
# XXX: todo, can this be working?
#get_pipeline_loader=XXX
)
# Run algorithms
returns = algo_obj.run(
data.transpose(2,1,0),
overwrite_sim_params=False
)
Hermmm..... we get an exception on backtest
NoEngineRegistered: Attempted to run a pipeline but no pipeline resources were registered
before_trading_start
so we use the existing pipeline data¶def before_trading_start_fixed(context, data):
# Call pipelive_output to get the output
#context.output = pipeline_output('ranked_2000')
global pipeline_data
try:
t = get_datetime().date() + timedelta(days=1)
context.output = pipeline_data.loc[t]
context.last_pipeline_index = pipeline_data.index.get_loc(t)
except KeyError as e:
# if we cannot find the specific date in pipeline, use last date
context.output = pipeline_data.irow(context.last_pipeline_index).reset_index(level=0, drop=True)
# Narrow down the securities to only the top 200 & update my universe
number_of_stocks = min(len(context.output)/2-1, 100)
context.long_list = context.output.sort(['combo_rank'], ascending=False).iloc[:number_of_stocks]
context.short_list = context.output.sort(['combo_rank'], ascending=False).iloc[-number_of_stocks:]
from zipline import TradingAlgorithm
#http://www.zipline.io/appendix.html
algo_obj = TradingAlgorithm(
initialize=initialize_fixed,
#handle_data=handle_data,
before_trading_start=before_trading_start_fixed,
start=start_date,
data_frequency='daily',
end=end_date,
#get_pipeline_loader=run_pipeline
)
# Run algorithms
results = algo_obj.run(
data.transpose(2,1,0),
overwrite_sim_params=True
)
Check with tearsheet data on the algorithm
Using pyfolio - https://quantopian.github.io/pyfolio/
Examples - https://github.com/quantopian/pyfolio/blob/master/pyfolio/examples/zipline_algo_example.ipynb
import pyfolio as pf
returns, positions, transactions, gross_lev = pf.utils.extract_rets_pos_txn_from_zipline(results)
pf.create_full_tear_sheet(returns, positions=positions,
transactions=transactions,
gross_lev=gross_lev
)