Quantopian's community platform is shutting down. Please read this post for more information and download your code.
Back to Community
How To Utilize Pipeline For 8000+ Securities?

I've been doing a lot of research about the pipeline API and apparently you should be able to sort through the entire Quantopian database with it. The furthest I've gotten is having pipeline sort through about 5000 securities. The goal is to have it check the entire universe and then screen out securities based on set parameters. Realistically I'd like to call this once per day and look at the close price of the previous day with the current price of today. It seems that I can only get it to sort based on fundamental data when I would like a non-biased look at every security.

Can this be done? What am I missing here?

import math  
import random  
import datetime  
import pandas as pd  
import numpy as np  
from sqlalchemy import or_  
from quantopian.algorithm import attach_pipeline, pipeline_output  
from quantopian.pipeline import Pipeline  
from quantopian.pipeline.factors import SimpleMovingAverage  
from quantopian.pipeline.factors import Latest  
from quantopian.pipeline.data.builtin import USEquityPricing  
from quantopian.pipeline.data.morningstar import valuation


class UniverseProvider():

    def __init__(self, shuffle_results = False, sort_columns = None, ascending = False):  
        self.shuffle_results = shuffle_results  
        self.sort_columns    = sort_columns  
        self.ascending       = ascending  
        self.flush()  


    def flush(self):  
        self.results        = None  
        self.stocks         = None  
        self.output         = None  
        return self  


    def set_shuffle_results(self, shuffle_results):  
        self.shuffle_results = shuffle_results  
        return self  


    def set_sort_columns(self, sort_columns, ascending):  
        self.sort_columns    = sort_columns  
        self.ascending       = ascending  
        return self  


    def get_results(self):  
        return self.results  


    def get_sids(self):  
        return self.stocks  


    def next(self, how_many_results):  
        if self.output is None:  
            df = self.get_output()  
            if self.shuffle_results:  
                df = df.reindex(index=np.random.permutation(df.index))  
            elif self.sort_columns is not None:  
                df = df.sort(columns=self.sort_columns, ascending=self.ascending)  
            self.output      = df  
            self.output_used = 0  
        start = self.output_used  
        end   = self.output_used + how_many_results  
        self.results = self.output.iloc[start:end,:]  
        self.stocks  = list(self.results.index)  
        log.debug('UniverseProvider retrieved stocks %d, selected %d, offset %d' % (len(self.output.index), len(self.stocks), self.output_used))  
        self.output_used += how_many_results  
        if self.output_used >= len(self.output.index):  
            self.output = None  
        return self  


    def get_output(self):  
        raise NotImplementedError("Subclass must implement 'get_output' method")  


class GetFundamentals(UniverseProvider):  
    def __init__(self, query, filter_ordered_nulls = True):  
        UniverseProvider.__init__(self)  
        self.query                = query  
        self.filter_ordered_nulls = filter_ordered_nulls  
    def get_output(self):  
        df = get_fundamentals(self.query, self.filter_ordered_nulls)  
        df = df.transpose()  
        return df

class PipelineOutput(UniverseProvider):  
    def __init__(self, pipeline_name):  
        UniverseProvider.__init__(self)  
        self.pipeline_name = pipeline_name  
    def get_output(self):  
        df = pipeline_output(self.pipeline_name)  
        return df  


# Put any initialization logic here.  The context object will be passed to  
# the other methods in your algorithm.  
def initialize(context):  
    context.universe = GetFundamentals(  
                        query(fundamentals.valuation.market_cap,  
                              fundamentals.valuation.shares_outstanding,  
                              fundamentals.company_reference.primary_exchange_id,  
                              fundamentals.company_reference.country_id)  
                         #.filter(fundamentals.company_reference.country_id == "USA")  
                         #.filter(or_(fundamentals.company_reference.primary_exchange_id == "NAS", fundamentals.company_reference.primary_exchange_id == "NYS"))  
                         .order_by(fundamentals.valuation.market_cap.desc())  
                         )

    schedule_function(func=before_trading_close, date_rule=date_rules.every_day(), time_rule=time_rules.market_close(minutes=60), half_days=True)  
def before_trading_start(context, data):  
    update_universe(context.universe.next(500).get_sids())  

# Will be called on every trade event for the securities you specify.  
def handle_data(context, data):  
    record(leverage=context.account.leverage)  
    pass

def before_trading_close(context, data):  
    #  
    # close positions  
    #  
    for sid in context.portfolio.positions.keys():  
        order_target_percent(sid, 0)  
    # get today open anc current price  
    open_history = history(bar_count=2, frequency='1d', field='open_price')  
    close_history = history(bar_count=2, frequency='1d', field='close_price')  
    # calculate daily returns up to this minute  
    returns_pct = {sid : ((close_history[sid][-1] - open_history[sid][-1]) / open_history[sid][-1]) for sid in data if (sid in close_history and sid in open_history) }  
    returns_pct = pd.Series(returns_pct)  
2 responses

Hi Travis,

This should be relatively simple to do using Pipeline as you mentioned! I think you might be missing some of the basics on how the Pipeline API works.

I would recommend taking a look at this example, as well as this one to get a better sense of how it works!

Disclaimer

The material on this website is provided for informational purposes only and does not constitute an offer to sell, a solicitation to buy, or a recommendation or endorsement for any security or strategy, nor does it constitute an offer to provide investment advisory services by Quantopian. In addition, the material offers no opinion with respect to the suitability of any security or specific investment. No information contained herein should be regarded as a suggestion to engage in or refrain from any investment-related course of action as none of Quantopian nor any of its affiliates is undertaking to provide investment advice, act as an adviser to any plan or entity subject to the Employee Retirement Income Security Act of 1974, as amended, individual retirement account or individual retirement annuity, or give advice in a fiduciary capacity with respect to the materials presented herein. If you are an individual retirement or other investor, contact your financial advisor or other fiduciary unrelated to Quantopian about whether any given investment idea, strategy, product or service described herein may be appropriate for your circumstances. All investments involve risk, including loss of principal. Quantopian makes no guarantees as to the accuracy or completeness of the views expressed in the website. The views are subject to change, and may have become unreliable for various reasons, including changes in market conditions or economic circumstances.

Thank you Jamie - I think I got it mostly sorted out now here's my code. It's alphabetical but it sorts through 8300+ securities.

from quantopian.algorithm import attach_pipeline, pipeline_output  
from quantopian.pipeline import Pipeline  
from quantopian.pipeline.data.builtin import USEquityPricing  
from quantopian.pipeline.factors import SimpleMovingAverage  
from quantopian.pipeline import CustomFactor  
import numpy as np  
import pandas as pd  
import datetime  
class PriceRange(CustomFactor):  
    # define inputs for the compute method  
    inputs = [USEquityPricing.close]  
    def compute(self, today, assets, out, close):  
        out[:] = close[-1]  
def initialize(context):  
    context.stocks = []  
    stock_prices = PriceRange(window_length=10)  
    #attaches a name to the pipeline  
    pipe = attach_pipeline(Pipeline(), name = 'pipeline')  
    # adds the stock_prices variable to the stock_prices pipeline  
    pipe.add(stock_prices, 'pipeline')

def before_trading_start(context, data):  
    # Access results using the name passed to `attach_pipeline`.  
    context.stocks = [sid for sid in data]  
    results = pipeline_output('pipeline')  
    for stock in context.stocks:  
        price = data[stock].price  
        if price < 5:

            print results.iloc[:30]

    # Define a universe with the results of a Pipeline.  
    update_universe(results.sort('pipeline').index[:10])  
def handle_data(context, data):  
    variable = int