# Imports needed to create and run pipeline
from quantopian.pipeline import Pipeline, CustomFactor
from quantopian.research import run_pipeline
# Import any pipeline data we want to use. These are most common.
from quantopian.pipeline.data.builtin import USEquityPricing
from quantopian.pipeline.data import Fundamentals # Morningstar
from quantopian.pipeline.data import factset # Factset
# Import any built in filters we want to use
from quantopian.pipeline.filters import StaticAssets
# Import these just in case
import numpy as np
import pandas as pd
# Check the numpy version. Later versions have a nicer 'unique' method which can be applied over an axis
# Looks like numpy isn't new enough so we'll need to map the function across columns manually
np.__version__
# Make a list of securities to test with
my_securities = StaticAssets(symbols(['T', 'VZ', 'FTR']))
my_securities
# Define out custom factor
# It will return the percent of change and the days since the change
# Note that the days is NOT the days since the data was reported
class DataChange(CustomFactor):
window_length=90
outputs= ["pct_change", "days_since_change"]
def compute(self, today, assets, out, data):
def column_change(column):
# The numpy 'unique' returns the unique values and the
# indexes of the first occurances of those values
value_at_changes, index_at_changes = np.unique(column, return_index=True)
# Note that the 'unique' method sorts the results by low to high values
# We really want them in the original time-sequenced order
# So sort the index and fetch the associated values from the original data
index_at_changes = sorted(index_at_changes)
value_at_changes = column[index_at_changes]
if len(index_at_changes) == 1:
# The 'unique' method only found a single value - no data change within the window
change = 0.0
days_since_change = np.nan
else:
# There was a change. We just want the last change so check [-1] and [-2]
change = (value_at_changes[-1] / value_at_changes[-2]) - 1.0
days_since_change = self.window_length - index_at_changes[-1]
return change, days_since_change
# Apply the above function across each column and output the values
out.pct_change[:], out.days_since_change[:] = np.apply_along_axis(column_change, 0, data)
# As a check, lets make a naive change factor
# It just checks if todays value is different fom yesterdays value
class DataChange_naive(CustomFactor):
def compute(self, today, assets, out, data):
out[:] = (data[-1] / data[-2]) - 1.0
def make_pipeline():
cash_change = DataChange(inputs=[Fundamentals.cash_and_cash_equivalents],
window_length = 90,
mask = my_securities)
naive_cash_change = DataChange_naive(inputs=[Fundamentals.cash_and_cash_equivalents],
window_length = 90,
mask = my_securities)
quarterly_sales_change = DataChange(inputs=[factset.Fundamentals.sales_qf],
window_length = 90,
mask = my_securities)
return Pipeline(
columns={
'cash_change_pct' : cash_change.pct_change,
'cash_change_days' : cash_change.days_since_change,
'cash_change_test' : naive_cash_change,
'quarterly_sales_change_pct' : quarterly_sales_change.pct_change,
},
screen = my_securities
)
df = run_pipeline(make_pipeline(), '2018-3-2', '2018-3-2')
#df['total_revenue_ttm_asof_date'] = df['total_revenue_ttm_asof_date'].astype('datetime64[ns]')
df
Notice that the naive test never senses the change in XON 3-5 to 3-6. This is because fundamental data is recalculated back to the as-of date when new data is loaded. In this case it appears that year end 12-31 XON earnings were released and then subsequently recorded by Mornigstar on 3-5. This then got reflected on 3-6 data. However, the fundamental data after 12-31 was then all updated to the new values. Therefore, there was no day to day difference observed.