from time import time
import numpy as np
import pandas as pd
from quantopian.pipeline import Pipeline
from quantopian.pipeline.factors import SimpleMovingAverage
from quantopian.research import run_pipeline
# the old fundamentals namespace
from quantopian.pipeline.data import morningstar
# the new fundamentals namespace
from quantopian.pipeline.data import Fundamentals
def timed_run_pipeline(pipeline, start, stop, samples=5):
"""Run a pipeline multiple times and print timing information.
Parameters
----------
pipeline : Pipeline
The pipeline to run.
start : datetime-like
The start date of the pipeline.
end : datetime-like
The end date of the pipeline.
samples : int, optional
How many times the pipeline should be run.
Returns
-------
output : pd.DataFrame
The results of running the pipeline over the specified dates.
times : np.ndarray[np.float64]
The time of each run in seconds.
"""
times = np.empty(samples)
for n in range(samples):
start_time = time()
output = run_pipeline(pipeline, start, stop)
times[n] = runtime = time() - start_time
print('Run %s: %.2fs' % (n + 1, runtime))
print('Mean Runtime: %.2f\nMedian Runtime: %.2f' % (np.mean(times), np.median(times)))
return output, times
def compare_pipelines(old_pipeline, new_pipeline, start, stop, samples=5):
"""Run two pipelines and print their timing information.
Parameters
----------
old_pipeline : Pipeline
The old pipeline to run.
new_pipeline : Pipeline
The new pipeline to run.
start : datetime-like
The start date of the pipelines.
end : datetime-like
The end date of the pipelines.
samples : int, optional
How many times the pipelines should be run.
Returns
-------
output : pd.DataFrame
The results of both pipelines. The columns of the old pipeline will
be prefixed with ``old_`` and the columns of the new pipeline will
be prefixed with ``new_``.
"""
print('Running old pipeline:')
old, old_times = timed_run_pipeline(
old_pipeline,
start,
stop,
samples=samples,
)
old.rename(
columns={column: 'old_' + column for column in old.columns},
inplace=True,
)
print('\nRunning new pipeline:')
new, new_times = timed_run_pipeline(
new_pipeline,
start,
stop,
samples=samples,
)
new.rename(
columns={column: 'new_' + column for column in new.columns},
inplace=True,
)
old_time = np.median(old_times)
new_time = np.median(new_times)
if old_time > new_time:
direction = 'Speedup'
by = old_time / new_time
else:
direction = 'Slowdown'
by = new_time / old_time
print('\n%s: %.2fx' % (direction, by))
return pd.concat([old, new], axis=1)
Here, we are comparing a query of the market cap of all assets for all of 2015.
The old market cap's performance is pretty volatile, different runs can take from 15 seconds to 30 seconds depending on the time of day and database load.
The new fundamentals performance is much more consistent, and is not affected by other users running queries.
market_cap = compare_pipelines(
Pipeline({'market_cap': morningstar.valuation.market_cap.latest}),
Pipeline({'market_cap': Fundamentals.market_cap.latest}),
'2015',
'2016',
)
Fields which are updated less frequently will experience and even more significant speedup.
operating_income = compare_pipelines(
Pipeline({'operating_income': morningstar.income_statement.operating_income.latest}),
Pipeline({'operating_income': Fundamentals.operating_income.latest}),
'2015',
'2016',
)
multi_factor = compare_pipelines(
Pipeline(
{
'mkt_cap': SimpleMovingAverage(inputs=[morningstar.valuation.market_cap], window_length=252),
'operating_income': SimpleMovingAverage(inputs=[morningstar.income_statement.operating_income], window_length=252),
'shares_outstanding': SimpleMovingAverage(inputs=[morningstar.valuation.shares_outstanding], window_length=252),
}
),
Pipeline(
{
'mkt_cap': SimpleMovingAverage(inputs=[Fundamentals.market_cap], window_length=252),
'operating_income': SimpleMovingAverage(inputs=[Fundamentals.operating_income], window_length=252),
'shares_outstanding': SimpleMovingAverage(inputs=[Fundamentals.shares_outstanding], window_length=252),
}
),
'2015',
'2016',
)
There are several data points that have been corrected in the new implementation.
For Apple we can see that we now have values for March of 2015 where before we were forward filling stale values for a few days.
aapl_market_cap = market_cap.loc[(slice(None), 24), :]
aapl_market_cap.index = aapl_market_cap.index.get_level_values(0)
plt = aapl_market_cap.plot(title='Apple Inc. Market Cap')
plt.set_xlabel('Date')
plt.set_ylabel('USD')
With the new system, many algorithms will see a considerable runtime improvement. Below is a pipeline adapted from this post. With the old fundamentals API, this pipeline takes considerably longer to compute and uses more memory in the process. The new fundamentals API is much faster, but also much more consistent in the performance.
The make_pipeline
function below shows how you can use the new fundamentals API as a drop-in replacement for the old fundamentals API to begin taking advantage of the performance improvements and data correctness right away.
from quantopian.pipeline.factors import CustomFactor, SimpleMovingAverage
from quantopian.pipeline.data.builtin import USEquityPricing
quarter_length = 65
latest = -1
one_year_ago = -4 * quarter_length
ttm = np.array([-1, -quarter_length, -2 * quarter_length, -3 * quarter_length])
ttm_py = np.array([
-4 * quarter_length,
-5 * quarter_length,
-6 * quarter_length,
-7 * quarter_length,
])
def make_pipeline(current_ratio,
exchange_id,
financial_leverage,
gross_margin,
invested_capital,
is_depositary_receipt,
is_primary_share,
long_term_debt_equity_ratio,
market_cap,
morningstar_sector_code,
net_common_stock_issuance,
net_income,
operating_cash_flow,
symbol,
total_assets,
total_revenue):
class ROA(CustomFactor):
"""
Profitability - Question 1. Return on Assets (ROA)
ROA = Net Income / Total Assets
Score 1 if the most recent ROA positive, 0 if negative.
"""
window_length = 3 * quarter_length + 1
inputs = [net_income]
def compute(self, today, assets, out, net_income):
net_income_ttm = np.sum(net_income[ttm], axis=0)
out[:] = (net_income_ttm > 0).astype(int)
class ROAChange(CustomFactor):
"""
Profitability - Question 3. Change in Return on Assets
Score 1 if this years ROA is higher than last year's ROA, 0 if it's
lower.
"""
window_length = 7 * quarter_length + 1
inputs = [net_income, total_assets]
def compute(self, today, assets, out, net_income, total_assets):
net_income_ttm = np.sum(net_income[ttm], axis=0)
net_income_ttm_py = np.sum(net_income[ttm_py], axis=0)
roa = net_income_ttm / total_assets[latest]
roa_py = net_income_ttm_py / total_assets[one_year_ago]
out[:] = (roa > roa_py).astype(int)
class CashFlow(CustomFactor):
"""
Profitability - Question 2. Cash Flow Return on Assets (CFROA)
Score 1 if the most recent Operating Cash Flow positive, otherwise 0.
"""
window_length = 3 * quarter_length + 1
inputs = [operating_cash_flow]
def compute(self, today, assets, out, cash_flow):
cash_flow_ttm = np.sum(cash_flow[ttm], axis=0)
out[:] = (cash_flow_ttm > 0).astype(int)
class EarningsQuality(CustomFactor):
"""
Profitability - Question 4. Quality of Earnings (Accrual)
Score 1 if Operating Cash Flow (TTM) > Net Income (TTM), otherwise 0.
"""
window_length = 3 * quarter_length + 1
inputs = [operating_cash_flow, net_income]
def compute(self, today, assets, out, operating_cash_flow, net_income):
operating_cash_flow_ttm = np.sum(operating_cash_flow[ttm], axis=0)
net_income_ttm = np.sum(net_income[ttm], axis=0)
out[:] = (operating_cash_flow_ttm > net_income_ttm).astype(int)
class LongTermDebtRatioChange(CustomFactor):
"""
Leverage - Question 5. Change in Gearing or Leverage
Score 1 if this year's this year's gearing (long-term debt to total
assets) is lower or equal to last year's gearing, 1 otherwise.
"""
window_length = 4 * quarter_length + 1
inputs = [long_term_debt_equity_ratio, financial_leverage]
def compute(self,
today,
assets,
out,
long_term_debt_equity_ratio,
financial_leverage):
long_term_debt_assets_ratio = (
long_term_debt_equity_ratio / financial_leverage
)
out[:] = (
long_term_debt_assets_ratio[latest] <=
long_term_debt_assets_ratio[one_year_ago]
).astype(int)
class CurrentDebtRatioChange(CustomFactor):
"""
Leverage - Question 6. Change in Working Capital (Liquidity)
Score 1 if this year's current ratio is higher than last year's, 0 if
it's lower
"""
window_length = 4 * quarter_length + 1
inputs = [current_ratio]
def compute(self, today, assets, out, current_ratio):
out[:] = (
current_ratio[latest] > current_ratio[one_year_ago]
).astype(int)
class SharesOutstandingChange(CustomFactor):
"""
Leverage - Question 7. Change in Shares in Issue
Score 1 if the net number of shares in issue this year is lower or
equal to last year
"""
window_length = 3 * quarter_length + 1
inputs = [net_common_stock_issuance]
def compute(self, today, assets, out, issuance):
issuance_ttm = np.sum(issuance[ttm], axis=0)
out[:] = (issuance_ttm <= 0).astype(int)
class GrossMarginChange(CustomFactor):
"""
Efficiency - Question 8. Change in Gross Margin
Score 1 if this year's gross margin is higher than last year, 0 if
it's lower.
"""
window_length = 4 * quarter_length + 1
inputs = [gross_margin]
def compute(self, today, assets, out, gross_margin):
out[:] = (
gross_margin[latest] > gross_margin[one_year_ago]
).astype(int)
class AssetsTurnoverChange(CustomFactor):
"""
Question 9. Change in asset turnover
Assets Turnover = Revenue / Total Assets
Score 1 if this year's asset turnover ratio is higher compared to last
year.
"""
window_length = 7 * quarter_length + 1
inputs = [total_revenue, total_assets]
def compute(self, today, assets, out, revenue, total_assets):
revenue_ttm = np.sum(revenue[ttm], axis=0)
revenue_ttm_py = np.sum(revenue[ttm_py], axis=0)
assets_turnover = revenue_ttm / total_assets[latest]
assets_turnover_py = revenue_ttm_py / total_assets[one_year_ago]
out[:] = (assets_turnover > assets_turnover_py).astype(int)
class UniverseFilter(CustomFactor):
"""
Return 1.0 for the following class of assets, otherwise 0.0:
* No Financials (103), Real Estate (104), Basic Materials (101) and
ADR (Basic Materials are too much sensitive to exogenous
macroeconomical shocks).
* Only primary common stocks
* Exclude When Distributed(WD), When Issued(WI) and VJ - usually
companies in bankruptcy
* Exclude Halted stocks (_V, _H)
* Only NYSE, AMEX and Nasdaq
* mkt cap > 5,000,000
* invested_capital > 0 (sanity check)
* total_assets > 0 (sanity check)
* Avoid illiquid stock (dollar trading volume average in the last 10
days less than 100,000)
"""
window_length = 10
inputs = [
USEquityPricing.close,
USEquityPricing.volume,
symbol,
exchange_id,
market_cap,
is_primary_share,
is_depositary_receipt,
morningstar_sector_code,
invested_capital,
total_assets,
]
def compute(self,
today,
assets,
out,
close_price,
volume,
symbol,
exchange_id,
mkt_cap,
is_primary_share,
is_depositary_receipt,
sector_code,
invested_capital,
total_assets):
dollar_volume_10d_avg = np.mean(close_price * volume, axis=0)
# Avoid illiquid stock (dollar trading volume average in the last
# 10 days less than 100,000)
criteria = dollar_volume_10d_avg > 1e5
# Mkt cap > 5,000,000
criteria = criteria & (mkt_cap[-1] > 5e6)
# Only primary Common Stock
criteria = criteria & (is_primary_share[-1])
# No ADR
criteria = criteria & (~is_depositary_receipt[-1])
# No Basic Materials
criteria = criteria & (sector_code[-1] != 101)
# No Financials
criteria = criteria & (sector_code[-1] != 103)
# No Real Estate
criteria = criteria & (sector_code[-1] != 104)
# Sanity check
criteria = criteria & (invested_capital[-1] > 0)
# Sanity check
criteria = criteria & (total_assets[-1] > 0)
# Exclude When Distributed(WD), When Issued(WI) and VJ (usually
# companies in bankruptcy) and Halted stocks (V, H)
accept_symbol = ~symbol[-1].endswith(
('_PR', '_WI', '_WD', '_VJ', '_V', '_H'),
)
criteria = criteria & accept_symbol
# Only NYSE, AMEX and Nasdaq
accept_exchange = exchange_id[-1].matches('(NYS|ASE|NAS)')
criteria = criteria & accept_exchange
out[:] = criteria.astype(float)
profitability = ROA() + CashFlow() + EarningsQuality()
funding = (
LongTermDebtRatioChange() +
CurrentDebtRatioChange() +
SharesOutstandingChange()
)
efficiency = GrossMarginChange()
piotroski = profitability + funding + efficiency
universe_filter = UniverseFilter()
sma_200 = SimpleMovingAverage(
inputs=[USEquityPricing.close],
window_length=200,
)
screen = (
(universe_filter >= 1.0) &
(sma_200 > 5) &
piotroski.isfinite()
)
return Pipeline(
{'piotroski': piotroski},
screen=screen,
)
# set up the pipeline with the old fundamentals API
with_old_fundamentals = make_pipeline(
morningstar.operation_ratios.current_ratio,
morningstar.share_class_reference.exchange_id,
morningstar.operation_ratios.financial_leverage,
morningstar.operation_ratios.gross_margin,
morningstar.balance_sheet.invested_capital,
morningstar.share_class_reference.is_depositary_receipt,
morningstar.share_class_reference.is_primary_share,
morningstar.operation_ratios.long_term_debt_equity_ratio,
morningstar.valuation.market_cap,
morningstar.asset_classification.morningstar_sector_code,
morningstar.cash_flow_statement.net_common_stock_issuance,
morningstar.income_statement.net_income,
morningstar.cash_flow_statement.operating_cash_flow,
morningstar.share_class_reference.symbol,
morningstar.balance_sheet.total_assets,
morningstar.income_statement.total_revenue,
)
# set up the pipeline with the new fundamentals API
with_new_fundamentals = make_pipeline(
Fundamentals.current_ratio,
Fundamentals.exchange_id,
Fundamentals.financial_leverage,
Fundamentals.gross_margin,
Fundamentals.invested_capital,
Fundamentals.is_depositary_receipt,
Fundamentals.is_primary_share,
Fundamentals.long_term_debt_equity_ratio,
Fundamentals.market_cap,
Fundamentals.morningstar_sector_code,
Fundamentals.net_common_stock_issuance,
Fundamentals.net_income,
Fundamentals.operating_cash_flow,
Fundamentals.symbol,
Fundamentals.total_assets,
Fundamentals.total_revenue,
)
piotroski = compare_pipelines(
with_old_fundamentals,
with_new_fundamentals,
'2015',
'2016',
samples=3,
)
piotroski
(piotroski.old_piotroski - piotroski.new_piotroski).abs().median()
The small changes have improved the accuracy of the Q1500US filter by adding in assets that were excluded due to data errors.
Currently the Q1500 is slower with the new API. The old API's Q1500 definition uses a special cache to load precomputed values very quickly. Without this cache the Q1500 would be around 10 times slower than it currently is. We are actively working to implement this caching logic for the new API's Q1500.
# the Q1500US filter backed by the old data
from quantopian.pipeline.filters.morningstar import Q1500US as OldQ1500US
# the Q1500US filter backed by the new data
from quantopian.pipeline.filters.fundamentals import Q1500US as NewQ1500US
q1500us = compare_pipelines(
Pipeline({'Q1500US': OldQ1500US()}),
Pipeline({'Q1500US': NewQ1500US()}),
'2015',
'2017',
samples=1,
)
Note that when the new version is reading from a cache, these times will be the same.
(q1500us.old_Q1500US != q1500us.new_Q1500US).groupby(level=0).sum().plot(title='Q1500 differences')
Below is a list of all of the assets that are newly included in the Q1500 from 2015 through 2016.
newly_added = q1500us.index.get_level_values(1)[q1500us.new_Q1500US & ~q1500us.old_Q1500US].unique()
sorted(newly_added)