Quantopian's community platform is shutting down. Please read this post for more information and download your code.
Back to Community
Pipeline and history function

Good afternoon,
Thanks to this paper: http://www.sciencedirect.com/science/article/pii/S1059056016301563
I developed this strategy in which I predict the direction of the overnight SPY and of the first 30 minutes of trading based on the last 30 minutes of the day before.

This is my code:

from quantopian.algorithm import attach_pipeline, pipeline_output  
from quantopian.pipeline import Pipeline  
from quantopian.pipeline import CustomFactor  
from quantopian.pipeline.data.builtin import USEquityPricing  
from quantopian.pipeline.data import morningstar  
import numpy as np  
import pandas as pd  
from datetime import date, timedelta  
from quantopian.pipeline.filters.morningstar import Q500US

def initialize(context):  
    #schedule function so that I open positions before market closes  
    schedule_function(rebalance1, date_rules.every_day(), time_rules.market_close(minutes=1))  
    #schedule function to change position  
    #when the market opens the next morning  
    schedule_function(rebalance2, date_rules.every_day(), time_rules.market_open(minutes=1))  
    #schedule function to close all positions 30 minutes  
    #after the market opens everyday  
    schedule_function(rebalance3, date_rules.every_day(), time_rules.market_open(minutes=31))  
    #leverage for long positions  
    context.long_leverage = 1  
    #leverage for short positions  
    context.short_leverage = -1  
    #create pipeline  
    my_pipe = make_pipeline()  
    attach_pipeline(my_pipe, 'my_pipeline')  
def make_pipeline():  
    # Base universe set to the Q1500US.  
    base_universe = Q500US()  
    # find close minute of trading and  
    last1min_before_clos = data.history(base_universe, 'close', 1, '1m')  
    last30min_before_clos = data.history(base_universe, 'close', 30, '1m')  
    #see where we have a gain in the last 30 minutes of trading  
    increasing_last30mins = last1min_before_clos > last30min_before_clos  
    # Filter to select securities to long.  
    longs_overnight = increasing_last30mins.top(25)  
    # Filter to select securities to short.  
    shorts_overnight = increasing_last30mins.bottom(25)  
    # Get today's open  and yesterday's close prices  
    today_open = data.history(base_universe, 'open', 1, '1d')  
    yesterday_close = data.history(base_universe,'close', 2, '1d')  
    #see where we have a gain overnight  
    increasing_overnight = today_open > yesterday_close  
    # Filter to select securities to long.  
    longs_morning = increasing_overnight.top(25)  
    # Filter to select securities to short.  
    shorts_morning = increasing_overnight.bottom(25)  
    # Filter for all securities that we want to trade.  
    securities_to_trade = (longs_overnight | shorts_overnight | longs_morning | shorts_morning)  
    pipe = Pipeline(  
        columns={  
            'longs_overnight': longs_overnight,  
            'shorts_overnight': shorts_overnight,  
            'longs_morning': longs_morning,  
            'shorts_morning': shorts_morning,  
        },  
        screen=securities_to_trade,  
    )

def before_trading_start(context, data):  
    # Call pipelive_output to get the output  
    context.output = pipeline_output('stocks_500')  
    Olongs_list = context.output[0]  
    Oshorts_list = context.output[1]  
    Mlongs_list = context.output[2]  
    Mshorts_list = context.output[3]  
#This function rebalances my portfolio before the market closes  
#I go long if the "last 30 minutes of trading" are positive and  
#I short otherwise.  
#This happens only for the best-performing 5% stocks and  
#for the worst-performing 5% stocks.  
def rebalance1(context, data):  
    #Compute weights  
    long_weight = context.long_leverage / float(len(Olongs_list))  
    short_weight = context.short_leverage / float(len(Oshorts_list))  
    #If last "30 minutes of trading" returns are positive I go long  
    #based on my long list, otherwise I don't invest.  
    for Olong in Olongs_list:  
        if Olong > 0:  
            order_target_percent(pipe.Olongs_list, long_weight)  
        else:  
            order_target_percent(pipe.Olongs_list, 0)  
    #If last "30 minutes of trading" returns are negative I go short  
    #based on my short list, otherwise I don't invest.  
    for Oshort in Oshorts_list:  
        if Oshort < 0:  
            order_target_percent(pipe.Olongs_list, short_weight)  
        else:  
            order_target_percent(pipe.Olongs_list, 0)

#This function rebalances my portfolio when the market opens  
#I go long if the "overnight returns" are positive and  
#I go short otherwise.  
#This happens only for the best-performing 5% stocks and  
#for the worst-performing 5% stocks  
def rebalance2(context, data):  
    #compute weights  
    long_weight = context.long_leverage / float(len(Mlongs_list))  
    short_weight = context.short_leverage / float(len(Mshorts_list))  
    #If overnight returns are negative go long based on my  
    #long list, otherwise I don't invest.  
    for Mlong in Mlongs_list:  
      if Mlong < 0:  
          order_target_percent(pipe.Mlongs_list, long_weight)  
      else:  
          order_target_percent(pipe.Mlongs_list, 0)  
    #If overnight returns are positive go short based on my  
    #short list, otherwise I don't invest.  
    for Mshort in Mshorts_list:  
      if Mshort > 0:  
          order_target_percent(pipe.Mshorts_list, short_weight)  
      else:  
          order_target_percent(pipe.Mshorts_list, 0)  
#This function rebalances my portfolio by closing all positions  
#30 minutes after the market opens in the next morning  
def rebalance3(context, data):  
    #I close every position 30 minutes after the market opens  
    for stock in base_universe:  
            order_target(stock, 0)  

I am kind of new to the use of pipeline, but here is where I got stuck:
My issue comes in line 34: I don't understand why, but it looks like the history function doesn't seem to work properly in pipeline.

Am I just mistaking somewhere or is there an alternative method that I am supposed to use to get the prices in pipeline for a specific minute of the day? How can I solve this problem?
Thanks in advance for the help,
Mattia

4 responses

Pipeline is simply an optimized way to retrieve daily data. It can't be used to access minute data. Use the '.history' method within an algorithm to get minute data. The two methods are separate. 'pipeline_output' to get daily data (price, volume, fundamental, and a number of other data sets). 'history' or 'current' to get minute data (price and volume data only). Maybe take a look at this post for some insight on what pipeline actually does https://www.quantopian.com/posts/need-help-with-pipeline-just-a-beginner .

Attach a backtest of your algorithm instead of pasting code please. If the algorithm doesn't run well enough to even backtest, simply comment out the offending lines.

Hi Dan,
First of all thank for the link, it helped me have a better overview on how the pipeline actually works and why we use it.
However, I am still struggling on understanding how I can insert the correct timing into my code.
For example:
On the one hand, when I seek the closing price or the opening price, if I am not mistaking, I could simply substitute this:

today_open = data.history(base_universe, 'open', 1, '1d')  

in the make_pipeline function with:

today_open = [USEquityPricing.open]  

and I should obtain what I have been looking for.
On the other hand, when it comes to define to the the "30th minute before the trading closes" which is the minute 15.30 it's a different story.
I tried to define it as a custom class like this:

class Minute30_before_close(CustomFactor):  
    last30min_before_clos = data.history(USEquityPricing, 'close', 30, '1m')  
    def compute(self, today, asset_ids, out, values):  
       out[:] = last30min_before_clos  

So that I could define the algorithm externally as a Custom Factor and then insert it into the pipeline table, but still it doesn't seem to be working.

"when I seek the closing price or the opening price, if I am not mistaking, I could simply substitute this:"

today_open = data.history(base_universe, 'open', 1, '1d')  

Yes, this will return todays open prices when this is run in 'handle_data' or a scheduled function. 'today_open' will be a pandas series or dataframe (see the documentation https://www.quantopian.com/help#api-data-history ) However, the following code

today_open = [USEquityPricing.open]  

doesn't actually get any data at all. 'USEquityPricing' is a dataset object. Take a look at the documentation (https://www.quantopian.com/help#initializing-a-pipeline) and read it carefully.

The most important thing to understand about DataSets is that they do
not hold actual data. DataSets are simply collections of objects that
tell the Pipeline API where and how to find the inputs to
computations.

So, back to your original question. If you want to get minute data then simply use the 'history' method.

last_30_min_of_data = data.history(my_security_list, 'close', 30, '1m')  

Place this in a scheduled function. The pipeline concepts like factors, custom factors, are COMPLETELY separate. Conversely, one can't use the history function inside a CustomFactor definition.

I would recommend using pipeline to simply select the universe of stocks. Don't actually fetch any data using pipeline since the only data you are interested in is minute data. Use the 'history' method to get all the data.

Attached is an example which may help you get started. I didn't exactly understand your strategy so it mostly is to demonstrate the use of 'history' and some of the pandas methods to check for returns etc.

Good luck.

Hi Dan,
Thanks again for your help, I just changed a couple of details in the code for which I haven't been super clear about.
I just make sure I understood completely:
1) So in my case the pipeline is just the "box" that locks-in the stocks I want to work with. In practice, I insert all my data separately without the need of fetching them directly with the pipeline.
2) why in your code, when you work in the "market_close_trades" function, you use:

 get close prices for last 30 minutes  
    close_prices = data.history(context.base_universe, 'close', 10, '1m')  
    # Find the 30 minute percent return using the dataframe method 'pct_change'  
    returns = close_prices.pct_change(5, freq = 'min')  

to get the last 30 minutes of trading returns and not something like this?

# get close prices for last 30 minutes  
    close_prices = data.history(context.base_universe, 'close', 30, '1m')  
    # Find the 30 minute percent return using the dataframe method 'pct_change'  
    returns = close_prices.pct_change(30, freq = 'min')