Quantopian's community platform is shutting down. Please read this post for more information and download your code.
Back to Community
Spreading get_fundamentals or pipeline securities basket across several days

As Q has a limit of 500 securities handled per day, if the basket of securities returned by get_fundamentals is larger than 500 securities and you like to trade (or simply scan) it all, you need to spread it across several days. Here is a class to do it.
Copy and paste the GetFundamentals class in your algorithm, initialize it and use it. E.g.:

def initialize(context):  
    context.fundamentals = GetFundamentals(  
                        query=query(fundamentals.valuation.market_cap,  
                              fundamentals.valuation.shares_outstanding,  
                              fundamentals.company_reference.country_id)  
                         .filter(fundamentals.company_reference.country_id == "USA")  
                         .filter(fundamentals.valuation.market_cap <= 1e10)  
                         .filter(fundamentals.valuation.market_cap > 1e8)  
                         , shuffle_results = True)  

Get next 500 from previous results or perform new query if all previous results has been already returned

def before_trading_start(context, data):  
    context.stocks = context.fundamentals.next(500).get_sids()  
    update_universe(context.stocks)  

Or simply get new 50 random (or the fist 50 if shuffle_results = False) results from the basket of securities every day (flush() discard previous query results)

def before_trading_start(context, data):  
    context.stocks = context.fundamentals.flush().next(50).get_sids()  
    update_universe(context.stocks)  

When I have some time I'll port the class to pipeline api.

10 responses

This is the class ported to Pipeline. You can use this class when, after the screening, you still have too many securities returned by pipeline_output.

def initialize(context):  
    [...your pipeline1 code here....]  
    context.pipeline_out = PipelineOutput('pipeline1',  shuffle_results = True)  

Either get next 500 stocks from the basket every day until the basket is empty (then it will be filled again):

def before_trading_start(context, data):  
    context.stocks = context.pipeline_out.next(500).get_sids()  
    update_universe(context.stocks)  

Or simply get new 50 random (or the fist 50 if shuffle_results = False) results from the basket of securities

def before_trading_start(context, data):  
    context.stocks = context.pipeline_out.flush().next(50).get_sids()  
    update_universe(context.stocks)  

Let me know of any bugs you might find.

Could you post a demo of this technique actually building a portfolio of 1000+ stocks? (maybe record(count=len(context.portfolio.positions)) or something).

I updated the original backtest to record the number of traded stocks (context.portfolio.positions)

Luca,

The problem is that update_universe does a drop/add of securities, unless you hold a position in a stock or there is an open order for a stock. So, you can't actually get the number of stocks in your universe to grow beyond 500.

I've attached a backtest as an illustration. If you comment out this line, you'll see the 500 stock limit:

order(stock, 1, style=LimitOrder(0.0))  

This hack gets all of the stocks into data but it takes approx. 16 trading days.

The other issue is that history cannot be called from within before_trading_start.

Also, keep in mind that under live trading with Interactive Brokers, open orders are cancelled at the market close, so the hack won't work.

My conclusion is that until Quantopian makes some changes, it is futile to attempt to try to deal with ~ 8000 stocks in data since their present architecture won't support it.

Grant

Grant,

I understand what you mean. I actually use that class only for scanning (not trading) a universe larger than 500 and select the securities more interesting.

This is just a workaround I use until Pipeline + "history in before_trading_start" will be available in both backtesting and the contest.

So it appears you are doing random sampling, at 500 per day? So the idea is to analyze a new randomly picked set of 500 stocks every day?

Another thing to keep in mind is that backtests need to be repeatable, for live trading. I suggest adding a fixed seed for np.random, which should remedy the problem (and maybe run the code by Quantopian to make sure everything is copacetic).

I need to select stocks in certain price range, volume range and other requirements that need history function. The only way to do this is to split the basket of securities to check in several days and decide which stock to trade in handle_data.

This will be fixed once Pipeline will be available in the contest and the history function will be available in before_trading_start. Who knows when both happen.

Thanks for the tip of repeatable backtests. Even though I don't know where that requirement is stated.

From the help page:

The use of random() isn't supported because of technical limitations with our start-of-day initialization.

As I found out, the actual requirement is that backtests need to be repeatable.

...and this is the epilogue. I merged the pipeline and get_fundamental code in one class so that it's possible to easily switch from get_fundamentals to Pipiline (or vice versa) as a source for update_universe . The idea is that you can use both get_fundamentals and pipeline in the same way if you consider them as a big basket of securities from which you can fetch a smaller subset every day.

Useful? Maybe, maybe not. You decide. I needed the code, I wrote it :)

Initialize a UniverseProvider class in context from a get_fundamentals query or an existing pipiline:

context.universe = PipelineOutput('mypipeline')  
#or  
context.universe = GetFundamentals(...query...)  

After the initialization you can forget about get_fundamentals or pipeline, you can access the sids in the same way:

def before_trading_start(context, data):  
    #  
    # Fetch some new entries from our universe provider (can be pipeline or get_fundamentals)  
    #  
    update_universe( context.universe.flush().next(50).get_sids() )  

The details

context.universe.next(50) # load next 50 sids from the universe basket. It calls get_fundamentals  or pipeline_output if ALL the universe has already been loaded otherwise it gives you back the next available sids after the previous call

context.universe.flush()   # discard current universe. Next time you call next() a new universe will be loaded

context.universe.get_sids()        # get the list of current loaded (next (...) ) sids 

context.universe.get_results()  # get DataFrame of current loaded (next (...) ) sids. This contains all the columns you might have selected in get_fundamentals query or pipeline.add method  

Some configuration you might set at initialization time (or whenever)

context.universe.set_shuffle_results(True/False)  # shuffle the universe as soon as it is loaded

context.universe.set_sort_columns(sort_columns=['market_cap', 'whatever'], ascending=True/False) # sort the universe as soon as it is loaded

And the class you need to include in your algorithm

#################################################################  
#As Q has a [limit of 500 securities handled per day,][1]  if the security basket returned  
#by get_fundamentals or Pipeline is larger than 500 securities and you like to trade or scan  
#it all, you need to spread it across several days.  Here is a class to do it.

class UniverseProvider():

    def __init__(self, shuffle_results = False, sort_columns = None, ascending = False):  
        self.shuffle_results = shuffle_results  
        self.sort_columns    = sort_columns  
        self.ascending       = ascending  
        self.flush()  


    def flush(self):  
        self.results        = None  
        self.stocks         = None  
        self.output         = None  
        return self  


    def set_shuffle_results(self, shuffle_results):  
        self.shuffle_results = shuffle_results  
        return self  


    def set_sort_columns(self, sort_columns, ascending):  
        self.sort_columns    = sort_columns  
        self.ascending       = ascending  
        return self  


    def get_results(self):  
        return self.results  


    def get_sids(self):  
        return self.stocks  


    def next(self, how_many_results):  
        if self.output is None:  
            df = self.get_output()  
            if self.shuffle_results:  
                df = df.reindex(index=np.random.permutation(df.index))  
            elif self.sort_columns is not None:  
                df = df.sort(columns=self.sort_columns, ascending=self.ascending)  
            self.output      = df  
            self.output_used = 0  
        start = self.output_used  
        end   = self.output_used + how_many_results  
        self.results = self.output.iloc[start:end,:]  
        self.stocks  = list(self.results.index)  
        log.debug('UniverseProvider retrieved stocks %d, selected %d, offset %d' % (len(self.output.index), len(self.stocks), self.output_used))  
        self.output_used += how_many_results  
        if self.output_used >= len(self.output.index):  
            self.output = None  
        return self  


    def get_output(self):  
        raise NotImplementedError("Subclass must implement 'get_output' method")  


class GetFundamentals(UniverseProvider):  
    def __init__(self, query, filter_ordered_nulls = True):  
        UniverseProvider.__init__(self)  
        self.query                = query  
        self.filter_ordered_nulls = filter_ordered_nulls  
    def get_output(self):  
        df = get_fundamentals(self.query, self.filter_ordered_nulls)  
        df = df.transpose()  
        return df

class PipelineOutput(UniverseProvider):  
    def __init__(self, pipeline_name):  
        UniverseProvider.__init__(self)  
        self.pipeline_name = pipeline_name  
    def get_output(self):  
        df = pipeline_output(self.pipeline_name)  
        return df