Notebook

Running Pipeline algorithm in research

This is an attempt to document how I tried, and failed to run a pipeline backtest on backtest.

We use the algorithm from https://www.quantopian.com/posts/introducing-the-pipeline-api to demostrate how to run backtest in the environment.

Summary in issues we saw is:

NoEngineRegistered: Attempted to run a pipeline but no pipeline resources were registered.

Looking at https://github.com/quantopian/zipline/blob/master/zipline/pipeline/engine.py#L31 seems like this we need to provide get_pipeline_loader for the algorithm:

algo_obj = TradingAlgorithm(
    initialize=initialize,
    handle_data=handle_data,
    before_trading_start=before_trading_start,
    get_pipeline_loader=???
)

Any way we can provide the pipeline_loader?

In [1]:
# example data input: any way to improve this?
from datetime import datetime
import pytz
start_date = datetime(2015, 1, 1, 0, 0, 0, 0, pytz.utc)
end_date = datetime(2016, 1, 1, 0, 0, 0, 0, pytz.utc)

Step1 - rename quantopian.algorithm into zipline.api

The original algorithm was from https://www.quantopian.com/posts/introducing-the-pipeline-api - we changed this so this would

And import other necessary api funtions

In [2]:
"""
This example comes from a request in the forums. 
The post can be found here: https://www.quantopian.com/posts/ranking-system-based-on-trading-volume-slash-shares-outstanding

The request was: 

I am stuck trying to build a stock ranking system with two signals:
1. Trading Volume/Shares Outstanding.
2. Price of current day / Price of 60 days ago.
Then rank Russell 2000 stocks every month, long the top 5%, short the bottom 5%.

"""
# STEP1: use from zipline.api instead
#from quantopian.algorithm import attach_pipeline, pipeline_output
from zipline.api import schedule_function, date_rules, time_rules, sid, symbol, symbols, \
        get_datetime, order_target_percent, record, attach_pipeline, set_commission, \
        order_target, pipeline_output

from quantopian.pipeline import Pipeline
from quantopian.pipeline import CustomFactor
from quantopian.pipeline.data.builtin import USEquityPricing
from quantopian.pipeline.data import morningstar


# Create custom factor #1 Trading Volume/Shares Outstanding
class Liquidity(CustomFactor):   
    
    # Pre-declare inputs and window_length
    inputs = [USEquityPricing.volume, morningstar.valuation.shares_outstanding] 
    window_length = 1
    
    # Compute factor1 value
    def compute(self, today, assets, out, volume, shares):       
        out[:] = volume[-1]/shares[-1]

# Create custom factor #2 Price of current day / Price of 60 days ago.        
class Momentum(CustomFactor):   
    
    # Pre-declare inputs and window_length
    inputs = [USEquityPricing.close] 
    window_length = 60
    
    # Compute factor2 value
    def compute(self, today, assets, out, close):       
        out[:] = close[-1]/close[0]
        
# Create custom factor to calculate a market cap based on yesterday's close
# We'll use this to get the top 2000 stocks by market cap
class MarketCap(CustomFactor):   
    
    # Pre-declare inputs and window_length
    inputs = [USEquityPricing.close, morningstar.valuation.shares_outstanding] 
    window_length = 1
    
    # Compute market cap value
    def compute(self, today, assets, out, close, shares):       
        out[:] = close[-1] * shares[-1]
        

def initialize(context):
    pipe = Pipeline()
    attach_pipeline(pipe, 'ranked_2000')
       
    # Add the two factors defined to the pipeline
    liquidity = Liquidity()
    pipe.add(liquidity, 'liquidity')
    
    momentum = Momentum()
    pipe.add(momentum, 'momentum')
    
    # Create and apply a filter representing the top 2000 equities by MarketCap every day
    # This is an approximation of the Russell 2000
    mkt_cap = MarketCap()
    top_2000 = mkt_cap.top(2000)
    
    # Rank factor 1 and add the rank to our pipeline
    liquidity_rank = liquidity.rank(mask=top_2000)
    pipe.add(liquidity_rank, 'liq_rank')
    
    # Rank factor 2 and add the rank to our pipeline
    momentum_rank = momentum.rank(mask=top_2000)
    pipe.add(momentum_rank, 'mom_rank')
    
    # Take the average of the two factor rankings, add this to the pipeline
    combo_raw = (liquidity_rank+momentum_rank)/2
    pipe.add(combo_raw, 'combo_raw') 
    
    # Rank the combo_raw and add that to the pipeline
    pipe.add(combo_raw.rank(mask=top_2000), 'combo_rank')
    
    # Set a screen to ensure that only the top 2000 companies by market cap 
    # with a momentum factor greater than 0 are returned
    pipe.set_screen(top_2000 & (momentum>0))
            
    # Scedule my rebalance function
    schedule_function(func=rebalance, 
                      date_rule=date_rules.month_start(days_offset=0), 
                      time_rule=time_rules.market_open(hours=0,minutes=30), 
                      half_days=True)
    
    # Schedule my plotting function
    schedule_function(func=record_vars,
                      date_rule=date_rules.every_day(),
                      time_rule=time_rules.market_close(),
                      half_days=True)
    
    # set my leverage
    context.long_leverage = 2.00
    context.short_leverage = -2.00
    
            
def before_trading_start(context, data):
    # Call pipelive_output to get the output
    context.output = pipeline_output('ranked_2000')
      
    # Narrow down the securities to only the top 200 & update my universe
    context.long_list = context.output.sort(['combo_rank'], ascending=False).iloc[:100]
    context.short_list = context.output.sort(['combo_rank'], ascending=False).iloc[-100:]   

def record_vars(context, data):  
    
     # Record and plot the leverage of our portfolio over time. 
#     record(leverage = context.account.leverage)
    
    print "Long List"
    log.info("\n" + str(context.long_list.sort(['combo_rank'], ascending=True).head(10)))
    
    print "Short List" 
    log.info("\n" + str(context.short_list.sort(['combo_rank'], ascending=True).head(10)))

# This rebalancing is called according to our schedule_function settings.     
def rebalance(context,data):
    
    long_weight = context.long_leverage / float(len(context.long_list))
    short_weight = context.short_leverage / float(len(context.short_list))

    
    for long_stock in context.long_list.index:
        log.info("ordering longs: %s" % (long_stock))
        log.info("weight is %s" % (long_weight))
        order_target_percent(long_stock, long_weight)
        
    for short_stock in context.short_list.index:
        log.info("ordering shorts> %s" % (short_stock))
        log.info("weight is %s" % (short_weight))
        order_target_percent(short_stock, short_weight)
        
    for stock in context.portfolio.positions.iterkeys():
        if stock not in context.long_list.index and stock not in context.short_list.index:
            order_target(stock, 0)
/usr/local/lib/python2.7/dist-packages/IPython/kernel/__main__.py:27: NotAllowedInLiveWarning: The fundamentals attribute valuation.shares_outstanding is not yet allowed in broker-backed live trading
/usr/local/lib/python2.7/dist-packages/IPython/kernel/__main__.py:50: NotAllowedInLiveWarning: The fundamentals attribute valuation.shares_outstanding is not yet allowed in broker-backed live trading
In [3]:
# also mock out the Log object
class log(object):
    @staticmethod
    def info(output):
        print output

Step2 - seperate out and factor out the API creation

Redefining initialize - not that we also limit the number of equities that would be added

In [4]:
from zipline.finance import commission

def create_pipeline(top_size=100):
    pipe = Pipeline()
       
    # Add the two factors defined to the pipeline
    liquidity = Liquidity()
    pipe.add(liquidity, 'liquidity')
    
    momentum = Momentum()
    pipe.add(momentum, 'momentum')
    
    # Create and apply a filter representing the top 2000 equities by MarketCap every day
    # This is an approximation of the Russell 2000
    mkt_cap = MarketCap()
    top_universe = mkt_cap.top(top_size)
    
    # Rank factor 1 and add the rank to our pipeline
    liquidity_rank = liquidity.rank(mask=top_universe)
    pipe.add(liquidity_rank, 'liq_rank')
    
    # Rank factor 2 and add the rank to our pipeline
    momentum_rank = momentum.rank(mask=top_universe)
    pipe.add(momentum_rank, 'mom_rank')
    
    # Take the average of the two factor rankings, add this to the pipeline
    combo_raw = (liquidity_rank+momentum_rank)/2
    pipe.add(combo_raw, 'combo_raw') 
    
    # Rank the combo_raw and add that to the pipeline
    pipe.add(combo_raw.rank(mask=top_universe), 'combo_rank')
    
    # Set a screen to ensure that only the top 2000 companies by market cap 
    # with a momentum factor greater than 0 are returned
    pipe.set_screen(top_universe & (momentum>0))
    return pipe


# NOTE: redefined from above
def initialize_fixed(context):
    pipe = create_pipeline()
    attach_pipeline(pipe, 'ranked_2000')
            
    # Scedule my rebalance function
    schedule_function(func=rebalance, 
                      date_rule=date_rules.month_start(days_offset=0), 
                      time_rule=time_rules.market_open(hours=0,minutes=30), 
                      half_days=True)
    
    # Schedule my plotting function
    schedule_function(func=record_vars,
                      date_rule=date_rules.every_day(),
                      time_rule=time_rules.market_close(),
                      half_days=True)
    
    # set my leverage
    context.long_leverage = 1.00
    context.short_leverage = -1.00
    
    # see below for fix on pipeline
    context.last_pipeline_index = 0
In [5]:
pipe = create_pipeline(top_size=20)
pipe.show_graph(format='png')
Out[5]:
In [6]:
from quantopian.research import run_pipeline

pipeline_data = run_pipeline(pipe, 
             start_date=start_date.isoformat(), 
             end_date=end_date.isoformat()
            )

pipeline_data
Out[6]:
combo_rank combo_raw liq_rank liquidity mom_rank momentum
2015-01-02 00:00:00+00:00 Equity(24 [AAPL]) 19.0 19.0 19.0 0.006038 19.0 1.123128
Equity(1091 [BRK_A]) 7.0 8.5 1.0 0.000103 16.0 1.098932
Equity(3149 [GE]) 14.0 12.5 15.0 0.002293 10.0 1.028122
Equity(4151 [JNJ]) 12.0 11.5 12.0 0.001663 11.0 1.028444
Equity(5061 [MSFT]) 10.0 11.0 13.0 0.001994 9.0 1.027199
Equity(5938 [PG]) 16.0 14.0 11.0 0.001577 17.0 1.104332
Equity(8151 [WFC]) 13.0 11.5 9.0 0.001343 14.0 1.080726
Equity(8229 [WMT]) 15.0 12.5 7.0 0.000991 18.0 1.117188
Equity(8347 [XOM]) 8.0 10.5 14.0 0.002061 7.0 0.997619
Equity(11100 [BRK_B]) 9.0 10.5 6.0 0.000920 15.0 1.098596
Equity(21536 [NVS]) 6.0 8.0 4.0 0.000344 12.0 1.032323
Equity(23112 [CVX]) 11.0 11.0 17.0 0.002674 5.0 0.976212
Equity(25006 [JPM]) 18.0 14.5 16.0 0.002641 13.0 1.056362
Equity(25066 [CHL]) 3.0 4.0 2.0 0.000176 6.0 0.993078
Equity(26578 [GOOG_L]) 4.0 4.5 8.0 0.001131 1.0 0.924236
Equity(27470 [RDS_B]) 1.0 3.0 3.0 0.000202 3.0 0.926625
Equity(27487 [RDS_A]) 2.0 3.5 5.0 0.000805 2.0 0.925181
Equity(42950 [FB]) 17.0 14.0 20.0 0.006401 8.0 1.022744
Equity(46631 [GOOG]) 5.0 7.0 10.0 0.001431 4.0 0.933616
Equity(47740 [BABA]) 20.0 19.0 18.0 0.003796 20.0 1.184211
2015-01-05 00:00:00+00:00 Equity(24 [AAPL]) 19.0 19.0 20.0 0.008021 18.0 1.089308
Equity(1091 [BRK_A]) 7.0 9.0 2.0 0.000196 16.0 1.066576
Equity(3149 [GE]) 12.0 12.5 17.0 0.003476 8.0 1.002407
Equity(4151 [JNJ]) 8.0 9.5 10.0 0.001514 9.0 1.002769
Equity(5061 [MSFT]) 14.0 13.0 16.0 0.002902 10.0 1.006521
Equity(5938 [PG]) 17.0 15.0 13.0 0.002128 17.0 1.082611
Equity(8151 [WFC]) 13.0 12.5 11.0 0.001825 14.0 1.055643
Equity(8229 [WMT]) 15.0 13.0 7.0 0.001099 19.0 1.104037
Equity(8347 [XOM]) 9.0 9.5 12.0 0.001912 7.0 0.987957
Equity(11100 [BRK_B]) 11.0 10.5 6.0 0.001055 15.0 1.065747
... ... ... ... ... ... ... ...
2015-12-31 00:00:00+00:00 Equity(11100 [BRK_B]) 4.0 6.0 5.0 0.000938 7.0 1.015551
Equity(16841 [AMZN]) 20.0 20.0 20.0 0.005626 20.0 1.282131
Equity(21536 [NVS]) 2.0 3.0 4.0 0.000503 2.0 0.937560
Equity(25006 [JPM]) 13.0 11.0 12.0 0.001611 10.0 1.073525
Equity(25066 [CHL]) 1.0 1.0 1.0 0.000073 1.0 0.937241
Equity(26578 [GOOG_L]) 15.0 13.0 10.0 0.001408 16.0 1.176932
Equity(38554 [BUD]) 9.0 9.0 3.0 0.000308 15.0 1.164834
Equity(42950 [FB]) 18.0 16.0 19.0 0.004042 13.0 1.145600
Equity(46631 [GOOG]) 14.0 12.5 8.0 0.001275 17.0 1.195553
Equity(47740 [BABA]) 19.0 17.5 16.0 0.002445 19.0 1.277005
2016-01-04 00:00:00+00:00 Equity(24 [AAPL]) 12.0 12.0 20.0 0.005972 4.0 0.954053
Equity(1091 [BRK_A]) 3.0 5.0 2.0 0.000182 8.0 0.994697
Equity(3149 [GE]) 16.0 15.0 17.0 0.004593 13.0 1.130190
Equity(4151 [JNJ]) 6.0 9.0 6.0 0.001067 12.0 1.088825
Equity(5061 [MSFT]) 19.0 16.5 15.0 0.002604 18.0 1.193072
Equity(5923 [PFE]) 11.0 10.5 16.0 0.002626 5.0 0.977485
Equity(5938 [PG]) 7.0 9.0 7.0 0.001285 11.0 1.086734
Equity(8151 [WFC]) 9.0 9.5 10.0 0.001530 9.0 1.047756
Equity(8229 [WMT]) 5.0 6.5 11.0 0.001597 2.0 0.931630
Equity(8347 [XOM]) 8.0 9.0 12.0 0.001796 6.0 0.992670
Equity(11100 [BRK_B]) 4.0 6.0 5.0 0.001010 7.0 0.994573
Equity(16841 [AMZN]) 20.0 19.5 19.0 0.005780 20.0 1.247998
Equity(21536 [NVS]) 2.0 3.0 3.0 0.000319 3.0 0.939705
Equity(25006 [JPM]) 13.0 12.0 14.0 0.002594 10.0 1.062782
Equity(25066 [CHL]) 1.0 1.0 1.0 0.000070 1.0 0.923127
Equity(26578 [GOOG_L]) 14.0 12.5 9.0 0.001474 16.0 1.161194
Equity(38554 [BUD]) 10.0 9.5 4.0 0.000413 15.0 1.139546
Equity(42950 [FB]) 17.0 16.0 18.0 0.005218 14.0 1.132576
Equity(46631 [GOOG]) 15.0 12.5 8.0 0.001357 17.0 1.181752
Equity(47740 [BABA]) 18.0 16.0 13.0 0.002148 19.0 1.225675

5060 rows × 6 columns

Step 4: Getting the research pricing

Let's get the data we need to run the backtest

In [7]:
# we should narrow down to about top 1000 equities
universe = list(set(pipeline_data.index.get_level_values(1)))

assert len(universe) < 5000, "Universe is too big: {}".format(len(universe))
len(universe)
Out[7]:
31
In [8]:
# ambigious Ambiguous ownership for 1 symbol, multiple assets held the following symbols:
# AGN: intersections: (('2002-01-01 00:00:00', '2015-03-16 00:00:00'),)
 
EXCLUDE_STOCKS = {} #{'AGN'}
universe = [ s for s in universe if s.symbol not in EXCLUDE_STOCKS ]
In [9]:
# https://www.quantopian.com/help#quantopian_research_get_pricing
data = get_pricing(universe,
                   start_date=start_date,
                   end_date=end_date,
                   #frequency='minute',
                   symbol_reference_date=start_date,
                   fields=['price', 'close_price'])
In [10]:
from zipline import TradingAlgorithm

#http://www.zipline.io/appendix.html
algo_obj = TradingAlgorithm(
    initialize=initialize_fixed,
    #handle_data=handle_data,
    before_trading_start=before_trading_start,
    start=start_date,
    #data_frequency='minute',
    end=end_date,
    # XXX: todo, can this be working?
    #get_pipeline_loader=XXX
)

# Run algorithms
returns = algo_obj.run(
    data.transpose(2,1,0),
    overwrite_sim_params=False
)
---------------------------------------------------------------------------
NoEngineRegistered                        Traceback (most recent call last)
<ipython-input-10-3561883d09dc> in <module>()
     16 returns = algo_obj.run(
     17     data.transpose(2,1,0),
---> 18     overwrite_sim_params=False
     19 )

/build/src/qexec_repo/zipline_repo/zipline/algorithm.pyc in run(self, data, overwrite_sim_params)
    686         try:
    687             perfs = []
--> 688             for perf in self.get_generator():
    689                 perfs.append(perf)
    690 

/build/src/qexec_repo/zipline_repo/zipline/gens/tradesimulation.pyc in transform(self)
    233                     self.simulation_dt = dt
    234                     algo.on_dt_changed(dt)
--> 235                     algo.before_trading_start(self.current_data)
    236                 elif action == MINUTE_END:
    237                     handle_benchmark(dt)

/build/src/qexec_repo/zipline_repo/zipline/algorithm.pyc in before_trading_start(self, data)
    451         with handle_non_market_minutes(data) if \
    452                 self.data_frequency == "minute" else ExitStack():
--> 453             self._before_trading_start(self, data)
    454 
    455         self._in_before_trading_start = False

<ipython-input-2-a8c2a8343ba3> in before_trading_start(context, data)
    110 def before_trading_start(context, data):
    111     # Call pipelive_output to get the output
--> 112     context.output = pipeline_output('ranked_2000')
    113 
    114     # Narrow down the securities to only the top 200 & update my universe

/build/src/qexec_repo/zipline_repo/zipline/utils/api_support.pyc in wrapped(*args, **kwargs)
     49     def wrapped(*args, **kwargs):
     50         # Get the instance and call the method
---> 51         return getattr(get_algo_instance(), f.__name__)(*args, **kwargs)
     52     # Add functor to zipline.api
     53     setattr(zipline.api, f.__name__, wrapped)

/build/src/qexec_repo/zipline_repo/zipline/utils/api_support.pyc in wrapped_method(self, *args, **kwargs)
     96             if not self.initialized:
     97                 raise exception
---> 98             return method(self, *args, **kwargs)
     99         return wrapped_method
    100     return decorator

/build/src/qexec_repo/zipline_repo/zipline/algorithm.pyc in pipeline_output(self, name)
   2316                 valid=list(self._pipelines.keys()),
   2317             )
-> 2318         return self._pipeline_output(p, chunks)
   2319 
   2320     def _pipeline_output(self, pipeline, chunks):

/build/src/qexec_repo/zipline_repo/zipline/algorithm.pyc in _pipeline_output(self, pipeline, chunks)
   2327         except Expired:
   2328             data, valid_until = self._run_pipeline(
-> 2329                 pipeline, today, next(chunks),
   2330             )
   2331             self._pipeline_cache = CachedObject(data, valid_until)

/build/src/qexec_repo/zipline_repo/zipline/algorithm.pyc in _run_pipeline(self, pipeline, start_session, chunksize)
   2374 
   2375         return \
-> 2376             self.engine.run_pipeline(pipeline, start_session, end_session), \
   2377             end_session
   2378 

/build/src/qexec_repo/zipline_repo/zipline/pipeline/engine.pyc in run_pipeline(self, pipeline, start_date, end_date)
     77     def run_pipeline(self, pipeline, start_date, end_date):
     78         raise NoEngineRegistered(
---> 79             "Attempted to run a pipeline but no pipeline "
     80             "resources were registered."
     81         )

NoEngineRegistered: Attempted to run a pipeline but no pipeline resources were registered.

Hermmm..... we get an exception on backtest

NoEngineRegistered: Attempted to run a pipeline but no pipeline resources were registered

Step 6: Let's redefined before_trading_start so we use the existing pipeline data

In [ ]:
def before_trading_start_fixed(context, data):
    # Call pipelive_output to get the output
    #context.output = pipeline_output('ranked_2000')
    global pipeline_data
    try:
        t = get_datetime().date() + timedelta(days=1)
        context.output = pipeline_data.loc[t]
        context.last_pipeline_index = pipeline_data.index.get_loc(t)
    except KeyError as e:
        # if we cannot find the specific date in pipeline, use last date
        context.output = pipeline_data.irow(context.last_pipeline_index).reset_index(level=0, drop=True)
    
    # Narrow down the securities to only the top 200 & update my universe
    number_of_stocks = min(len(context.output)/2-1, 100)
    context.long_list = context.output.sort(['combo_rank'], ascending=False).iloc[:number_of_stocks]
    context.short_list = context.output.sort(['combo_rank'], ascending=False).iloc[-number_of_stocks:]   
In [ ]:
from zipline import TradingAlgorithm

#http://www.zipline.io/appendix.html
algo_obj = TradingAlgorithm(
    initialize=initialize_fixed,
    #handle_data=handle_data,
    before_trading_start=before_trading_start_fixed,
    start=start_date,
    data_frequency='daily',
    end=end_date,
    #get_pipeline_loader=run_pipeline
)

# Run algorithms
results = algo_obj.run(
    data.transpose(2,1,0),
    overwrite_sim_params=True
)

Success! Let's run some tearsheet to confirm...

Check with tearsheet data on the algorithm

Using pyfolio - https://quantopian.github.io/pyfolio/

Examples - https://github.com/quantopian/pyfolio/blob/master/pyfolio/examples/zipline_algo_example.ipynb

In [ ]:
import pyfolio as pf

returns, positions, transactions, gross_lev = pf.utils.extract_rets_pos_txn_from_zipline(results)
In [ ]:
pf.create_full_tear_sheet(returns, positions=positions, 
                          transactions=transactions,
                          gross_lev=gross_lev
                         )