Notebook

Constructing a Pipeline in Research

This example comes from a request in the forums. The original post can be found here: https://www.quantopian.com/posts/ranking-system-based-on-trading-volume-slash-shares-outstanding

It has also been used as the core pipeline API example. The notebooks below shows how you would compute this pipeline here in a research notebooks.

The original request was:

I am stuck trying to build a stock ranking system with two signals:

  1. Trading Volume/Shares Outstanding.
  2. Price of current day / Price of 60 days ago. Then rank Russell 2000 stocks every month, long the top 5%, short the bottom 5%.
In [11]:
from quantopian.pipeline import Pipeline
from quantopian.pipeline import CustomFactor
from quantopian.research import run_pipeline
from quantopian.pipeline.data import morningstar
from quantopian.pipeline.data.builtin import USEquityPricing

Custom factors are written exactly the same as in the backtester.

Documentation can be found here: https://www.quantopian.com/help#quantopian_pipeline_Pipeline

In [12]:
class Liquidity(CustomFactor):   
    
    # Pre-declare inputs and window_length
    inputs = [USEquityPricing.volume, morningstar.valuation.shares_outstanding] 
    window_length = 1
    
    # Compute factor1 value
    def compute(self, today, assets, out, volume, shares):       
        out[:] = volume[-1]/shares[-1]
/usr/local/lib/python2.7/dist-packages/IPython/kernel/__main__.py:4: NotAllowedInLiveWarning: The fundamentals attribute valuation.shares_outstanding is not yet allowed in broker-backed live trading
In [13]:
class Momentum(CustomFactor):   
    
    # Pre-declare inputs and window_length
    inputs = [USEquityPricing.close] 
    window_length = 60
    
    # Compute factor2 value
    def compute(self, today, assets, out, close):       
        out[:] = close[-1]/close[0]
In [14]:
class MarketCap(CustomFactor):   
    
    # Pre-declare inputs and window_length
    inputs = [USEquityPricing.close, morningstar.valuation.shares_outstanding] 
    window_length = 1
    
    # Compute market cap value
    def compute(self, today, assets, out, close, shares):       
        out[:] = close[-1] * shares[-1]
/usr/local/lib/python2.7/dist-packages/IPython/kernel/__main__.py:4: NotAllowedInLiveWarning: The fundamentals attribute valuation.shares_outstanding is not yet allowed in broker-backed live trading

Factors and filters are constructed the same way as in the backtester.

In [15]:
liquidity = Liquidity()
momentum = Momentum()
mkt_cap = MarketCap()

mkt_cap_rank = mkt_cap.rank(ascending=False)
russell_2000 = (1000 > mkt_cap_rank) & (mkt_cap_rank <= 3000)

liquidity_rank = liquidity.rank(mask=russell_2000, ascending=False)
momentum_rank = momentum.rank(mask=russell_2000,  ascending=False)
combo_raw = (liquidity_rank+momentum_rank)/2
combo_rank = combo_raw.rank(mask=russell_2000,  ascending=True)

Pipelines are also constructed in exactly the same way as the backtester. This can be done one column at a time.....

In [16]:
pipe = Pipeline()
       
pipe.add(liquidity, 'liquidity')
pipe.add(momentum, 'momentum')
pipe.add(liquidity_rank, 'liq_rank')
pipe.add(momentum_rank, 'mom_rank')
pipe.add(combo_raw, 'combo_raw')
pipe.add(combo_rank, 'combo_rank')
pipe.set_screen(russell_2000 & (momentum.eq(momentum)))

...or all at once.

In [17]:
pipe = Pipeline(
    columns={'mkt_cap':mkt_cap, 
             'momentum':momentum, 
             'liquidity':liquidity, 
             'liquidity_rank':liquidity_rank, 
             'momentum_rank':momentum_rank,
             'combo_raw':combo_raw,
             'combo_rank':combo_rank
            },
    screen=(russell_2000 & (momentum.eq(momentum))), 
)

The show_graph() method of pipeline objects produces a graph to show how it is being calculated.

In [18]:
pipe.show_graph(format='png')
Out[18]:

run_pipeline will produce the output of your pipeline.

In research, you must specify a start and end date. The results are a heirarchical dataframe, where the pipeline output for each day is included. The output shows the results that the pipeline would generate in before_trading_start on that day, using the data of the day before. This is different from the backtest, where only the results of the given day are included on any day.

In [19]:
run_pipeline(pipe, start_date='2015-11-01', end_date='2015-11-25')
---------------------------------------------------------------------------
KeyError                                  Traceback (most recent call last)
<ipython-input-19-8e1c740f77b7> in <module>()
----> 1 run_pipeline(pipe, start_date='2015-11-01', end_date='2015-11-25')

/build/src/extensions/extensions/main.py in run_pipeline(pipeline, start_date, end_date)
    396         start_date=adjust_date,
    397         end_date=adjust_date,
--> 398     )
    399     def run_pipeline(pipeline, start_date, end_date):
    400         """

/build/src/extensions/extensions/main.py in run_pipeline(pipeline, start_date, end_date)
    404         PipelineEngine.run_pipeline.
    405         """
--> 406         return engine.run_pipeline(pipeline, start_date, end_date)
    407     return run_pipeline
    408 

/build/src/qexec_repo/zipline_repo/zipline/pipeline/engine.pyc in run_pipeline(self, pipeline, start_date, end_date)
    171             dates,
    172             assets,
--> 173             initial_workspace={self._root_mask_term: root_mask_values},
    174         )
    175 

/build/src/qexec_repo/zipline_repo/zipline/pipeline/engine.pyc in compute_chunk(self, graph, dates, assets, initial_workspace)
    352                 loader = get_loader(term)
    353                 loaded = loader.load_adjusted_array(
--> 354                     to_load, mask_dates, assets, mask,
    355                 )
    356                 workspace.update(loaded)

/build/src/qexec_repo/zipline_repo/zipline/pipeline/loaders/equity_pricing_loader.pyc in load_adjusted_array(self, columns, dates, assets, mask)
     75             start_date,
     76             end_date,
---> 77             assets,
     78         )
     79         adjustments = self.adjustments_loader.load_adjustments(

/build/src/qexec_repo/qexec/sources/resource_proxy.pyc in method(self, *args, **kwargs)
    134     def method(self, *args, **kwargs):
    135         with self.resource as proxied:
--> 136             return getattr(proxied, name)(*args, **kwargs)
    137     return method
    138 

/build/src/qexec_repo/zipline_repo/zipline/data/us_equity_pricing.pyc in load_raw_arrays(self, columns, start_date, end_date, assets)
    560     def load_raw_arrays(self, columns, start_date, end_date, assets):
    561         # Assumes that the given dates are actually in calendar.
--> 562         start_idx = self._calendar.get_loc(start_date)
    563         end_idx = self._calendar.get_loc(end_date)
    564         first_rows, last_rows, offsets = self._compute_slices(

/build/src/qexec_repo/zipline_repo/zipline/utils/memoize.pyc in __get__(self, instance, owner)
     51             return self._cache[instance]
     52         except KeyError:
---> 53             self._cache[instance] = val = self._get(instance)
     54             return val
     55 

/build/src/qexec_repo/zipline_repo/zipline/data/us_equity_pricing.pyc in _calendar(self)
    472     @lazyval
    473     def _calendar(self):
--> 474         return DatetimeIndex(self._table.attrs['calendar'], tz='UTC')
    475 
    476     @lazyval

/usr/local/lib/python2.7/dist-packages/bcolz/attrs.pyc in __getitem__(self, name)
     93 
     94     def __getitem__(self, name):
---> 95         return self.attrs[name]
     96 
     97     def __setitem__(self, name, obj):

KeyError: 'calendar'
In [ ]: