I've been doing a lot of research about the pipeline API and apparently you should be able to sort through the entire Quantopian database with it. The furthest I've gotten is having pipeline sort through about 5000 securities. The goal is to have it check the entire universe and then screen out securities based on set parameters. Realistically I'd like to call this once per day and look at the close price of the previous day with the current price of today. It seems that I can only get it to sort based on fundamental data when I would like a non-biased look at every security.
Can this be done? What am I missing here?
import math
import random
import datetime
import pandas as pd
import numpy as np
from sqlalchemy import or_
from quantopian.algorithm import attach_pipeline, pipeline_output
from quantopian.pipeline import Pipeline
from quantopian.pipeline.factors import SimpleMovingAverage
from quantopian.pipeline.factors import Latest
from quantopian.pipeline.data.builtin import USEquityPricing
from quantopian.pipeline.data.morningstar import valuation
class UniverseProvider():
def __init__(self, shuffle_results = False, sort_columns = None, ascending = False):
self.shuffle_results = shuffle_results
self.sort_columns = sort_columns
self.ascending = ascending
self.flush()
def flush(self):
self.results = None
self.stocks = None
self.output = None
return self
def set_shuffle_results(self, shuffle_results):
self.shuffle_results = shuffle_results
return self
def set_sort_columns(self, sort_columns, ascending):
self.sort_columns = sort_columns
self.ascending = ascending
return self
def get_results(self):
return self.results
def get_sids(self):
return self.stocks
def next(self, how_many_results):
if self.output is None:
df = self.get_output()
if self.shuffle_results:
df = df.reindex(index=np.random.permutation(df.index))
elif self.sort_columns is not None:
df = df.sort(columns=self.sort_columns, ascending=self.ascending)
self.output = df
self.output_used = 0
start = self.output_used
end = self.output_used + how_many_results
self.results = self.output.iloc[start:end,:]
self.stocks = list(self.results.index)
log.debug('UniverseProvider retrieved stocks %d, selected %d, offset %d' % (len(self.output.index), len(self.stocks), self.output_used))
self.output_used += how_many_results
if self.output_used >= len(self.output.index):
self.output = None
return self
def get_output(self):
raise NotImplementedError("Subclass must implement 'get_output' method")
class GetFundamentals(UniverseProvider):
def __init__(self, query, filter_ordered_nulls = True):
UniverseProvider.__init__(self)
self.query = query
self.filter_ordered_nulls = filter_ordered_nulls
def get_output(self):
df = get_fundamentals(self.query, self.filter_ordered_nulls)
df = df.transpose()
return df
class PipelineOutput(UniverseProvider):
def __init__(self, pipeline_name):
UniverseProvider.__init__(self)
self.pipeline_name = pipeline_name
def get_output(self):
df = pipeline_output(self.pipeline_name)
return df
# Put any initialization logic here. The context object will be passed to
# the other methods in your algorithm.
def initialize(context):
context.universe = GetFundamentals(
query(fundamentals.valuation.market_cap,
fundamentals.valuation.shares_outstanding,
fundamentals.company_reference.primary_exchange_id,
fundamentals.company_reference.country_id)
#.filter(fundamentals.company_reference.country_id == "USA")
#.filter(or_(fundamentals.company_reference.primary_exchange_id == "NAS", fundamentals.company_reference.primary_exchange_id == "NYS"))
.order_by(fundamentals.valuation.market_cap.desc())
)
schedule_function(func=before_trading_close, date_rule=date_rules.every_day(), time_rule=time_rules.market_close(minutes=60), half_days=True)
def before_trading_start(context, data):
update_universe(context.universe.next(500).get_sids())
# Will be called on every trade event for the securities you specify.
def handle_data(context, data):
record(leverage=context.account.leverage)
pass
def before_trading_close(context, data):
#
# close positions
#
for sid in context.portfolio.positions.keys():
order_target_percent(sid, 0)
# get today open anc current price
open_history = history(bar_count=2, frequency='1d', field='open_price')
close_history = history(bar_count=2, frequency='1d', field='close_price')
# calculate daily returns up to this minute
returns_pct = {sid : ((close_history[sid][-1] - open_history[sid][-1]) / open_history[sid][-1]) for sid in data if (sid in close_history and sid in open_history) }
returns_pct = pd.Series(returns_pct)