One of the primary benefits of the Pipeline API
is that Filter
, Factor
, and Pipeline
definitions are transferrable between backtesting and research. This makes it easy to develop and analyze a Pipeline with an interactive workflow, moving the final product to the backtester only when we're ready to incorporate our work into a full trading strategy.
In this notebook, we show how to run and analyze a pipeline describing a simple long/short portfolio.
We build a Pipeline that ranks assets based on combined Value/Quality metrics, constructing a long portfolio out of the top 200 assets and a short portfolio out of the bottom 200 assets. Ranks are performed after performing an initial screen that removes assets that fail to meet basic liquidity and stability criteria.
We then use pandas
and seaborn
to analyze the results of our Pipeline. In our analysis we show how to do the following:
import numpy as np
from quantopian.pipeline import Pipeline
from quantopian.pipeline.data import morningstar
from quantopian.pipeline.data.builtin import USEquityPricing
from quantopian.pipeline.factors import CustomFactor, SimpleMovingAverage
class Value(CustomFactor):
inputs = [morningstar.income_statement.ebit,
morningstar.valuation.enterprise_value]
window_length = 1
def compute(self, today, assets, out, ebit, ev):
out[:] = ebit[-1] / ev[-1]
class Quality(CustomFactor):
# Pre-declare inputs and window_length
inputs = [morningstar.operation_ratios.roe,]
window_length = 1
def compute(self, today, assets, out, roe):
out[:] = roe[-1]
class AvgDailyDollarVolumeTraded(CustomFactor):
inputs = [USEquityPricing.close, USEquityPricing.volume]
def compute(self, today, assets, out, close_price, volume):
out[:] = np.mean(close_price * volume, axis=0)
def make_pipeline():
"""
Create and return our pipeline.
We break this piece of logic out into its own function to make it easier to
test and modify in isolation.
In particular, this function can be copy/pasted into research and run by itself.
"""
pipe = Pipeline()
# Basic value and quality metrics.
value = Value()
pipe.add(value, "value")
quality = Quality()
pipe.add(quality, "quality")
# We only want to trade relatively liquid stocks.
# Build a filter that only passes stocks that have $10,000,000 average
# daily dollar volume over the last 20 days.
dollar_volume = AvgDailyDollarVolumeTraded(window_length=20)
is_liquid = (dollar_volume > 1e7)
# We also don't want to trade penny stocks, which we define as any stock with an
# average price of less than $5.00 over the last 200 days.
sma_200 = SimpleMovingAverage(inputs=[USEquityPricing.close], window_length=200)
not_a_penny_stock = (sma_200 > 5)
# Before we do any other ranking, we want to throw away these assets.
initial_screen = (is_liquid & not_a_penny_stock)
# Construct and add a Factor representing the average rank of each asset by our
# value and quality metrics.
# By applying a mask to the rank computations, we remove any stocks that failed
# to meet our initial criteria **before** computing ranks. This means that the
# stock with rank 10.0 is the 10th-lowest stock that passed `initial_screen`.
combined_rank = (
value.rank(mask=initial_screen) +
quality.rank(mask=initial_screen)
)
pipe.add(combined_rank, 'combined_rank')
# Build Filters representing the top and bottom 200 stocks by our combined ranking system.
# We'll use these as our tradeable universe ech
longs = combined_rank.top(200)
shorts = combined_rank.bottom(200)
# The final output of our pipeline should only include
# the top/bottom 200 stocks by our criteria.
pipe.set_screen(longs | shorts)
pipe.add(longs, 'longs')
pipe.add(shorts, 'shorts')
return pipe
pipe = make_pipeline()
pipe.show_graph('png')
attach_pipeline(pipe)
in our initialize
function.¶Under the hood, the backtester calls run_pipeline
on dynamically-sized chunks of dates, making (hopefully intelligent) tradeoffs between memory usage and execution time, and ensuring that algorithms aren't exposed to lookahead bias by gaining early access to pre-fetched data. See Pipeline in Research: What are the runtime limits? for an in-depth look at how this works.
In research we provide raw access to the run_pipeline
function, which accepts a Pipeline
object, a start_date
, and an end_date
.
from quantopian.research import run_pipeline
# This takes a few minutes.
results = run_pipeline(pipe, '2011', '2012')
results
# Verify that we get 400 assets in our universe each day.
assets_per_day = results.groupby(level=0).size()
assets_per_day.describe()
long_short_count_by_day = results[['longs', 'shorts']].groupby(level=0).sum()
long_short_count_by_day.describe()
import seaborn as sns
import matplotlib.pyplot as plt
# Create a 2 x 1 vertical grid of plotting areas.
fig, axes = plt.subplots(2, 1, figsize=(16, 12))
# Compute counts of long/short appearances, grouped by asset.
long_counts_by_asset = results['longs'].groupby(level=1).sum().order()
short_counts_by_asset = results['shorts'].groupby(level=1).sum().order()
# Plot long counts on first axis.
sns.distplot(long_counts_by_asset, ax=axes[0], kde=False, vertical=True,
axlabel="Count of Stocks with N Appearances in Longs")
# Plot short counts on second axis.
sns.distplot(short_counts_by_asset, ax=axes[1], kde=False, vertical=True,
axlabel="Count of Stocks with N Appearances in Shorts")
# Remove grid lines from axes. They look bad with the horizontal bar chart.
axes[0].grid(False)
axes[1].grid(False)
long_every_day = long_counts_by_asset[long_counts_by_asset == 253]
for asset in long_every_day.index:
print asset
short_every_day = short_counts_by_asset[short_counts_by_asset == 253]
for asset in short_every_day.index:
print asset
import pandas as pd
combined_counts = pd.DataFrame(
data={
'long_counts': long_counts_by_asset,
'short_counts': short_counts_by_asset
},
)
combined_counts.head()
sns.jointplot(combined_counts.long_counts, combined_counts.short_counts, kind='kde')
mixed_assets = combined_counts[(combined_counts.long_counts > 0) & (combined_counts.short_counts > 0)]
mixed_assets
sns.jointplot(mixed_assets.long_counts, mixed_assets.short_counts, kind='kde')