...and this is the epilogue. I merged the pipeline and get_fundamental code in one class so that it's possible to easily switch from get_fundamentals to Pipiline (or vice versa) as a source for update_universe . The idea is that you can use both get_fundamentals and pipeline in the same way if you consider them as a big basket of securities from which you can fetch a smaller subset every day.
Useful? Maybe, maybe not. You decide. I needed the code, I wrote it :)
Initialize a UniverseProvider class in context from a get_fundamentals query or an existing pipiline:
context.universe = PipelineOutput('mypipeline')
#or
context.universe = GetFundamentals(...query...)
After the initialization you can forget about get_fundamentals or pipeline, you can access the sids in the same way:
def before_trading_start(context, data):
#
# Fetch some new entries from our universe provider (can be pipeline or get_fundamentals)
#
update_universe( context.universe.flush().next(50).get_sids() )
The details
context.universe.next(50) # load next 50 sids from the universe basket. It calls get_fundamentals or pipeline_output if ALL the universe has already been loaded otherwise it gives you back the next available sids after the previous call
context.universe.flush() # discard current universe. Next time you call next() a new universe will be loaded
context.universe.get_sids() # get the list of current loaded (next (...) ) sids
context.universe.get_results() # get DataFrame of current loaded (next (...) ) sids. This contains all the columns you might have selected in get_fundamentals query or pipeline.add method
Some configuration you might set at initialization time (or whenever)
context.universe.set_shuffle_results(True/False) # shuffle the universe as soon as it is loaded
context.universe.set_sort_columns(sort_columns=['market_cap', 'whatever'], ascending=True/False) # sort the universe as soon as it is loaded
And the class you need to include in your algorithm
#################################################################
#As Q has a [limit of 500 securities handled per day,][1] if the security basket returned
#by get_fundamentals or Pipeline is larger than 500 securities and you like to trade or scan
#it all, you need to spread it across several days. Here is a class to do it.
class UniverseProvider():
def __init__(self, shuffle_results = False, sort_columns = None, ascending = False):
self.shuffle_results = shuffle_results
self.sort_columns = sort_columns
self.ascending = ascending
self.flush()
def flush(self):
self.results = None
self.stocks = None
self.output = None
return self
def set_shuffle_results(self, shuffle_results):
self.shuffle_results = shuffle_results
return self
def set_sort_columns(self, sort_columns, ascending):
self.sort_columns = sort_columns
self.ascending = ascending
return self
def get_results(self):
return self.results
def get_sids(self):
return self.stocks
def next(self, how_many_results):
if self.output is None:
df = self.get_output()
if self.shuffle_results:
df = df.reindex(index=np.random.permutation(df.index))
elif self.sort_columns is not None:
df = df.sort(columns=self.sort_columns, ascending=self.ascending)
self.output = df
self.output_used = 0
start = self.output_used
end = self.output_used + how_many_results
self.results = self.output.iloc[start:end,:]
self.stocks = list(self.results.index)
log.debug('UniverseProvider retrieved stocks %d, selected %d, offset %d' % (len(self.output.index), len(self.stocks), self.output_used))
self.output_used += how_many_results
if self.output_used >= len(self.output.index):
self.output = None
return self
def get_output(self):
raise NotImplementedError("Subclass must implement 'get_output' method")
class GetFundamentals(UniverseProvider):
def __init__(self, query, filter_ordered_nulls = True):
UniverseProvider.__init__(self)
self.query = query
self.filter_ordered_nulls = filter_ordered_nulls
def get_output(self):
df = get_fundamentals(self.query, self.filter_ordered_nulls)
df = df.transpose()
return df
class PipelineOutput(UniverseProvider):
def __init__(self, pipeline_name):
UniverseProvider.__init__(self)
self.pipeline_name = pipeline_name
def get_output(self):
df = pipeline_output(self.pipeline_name)
return df