Quantopian's community platform is shutting down. Please read this post for more information and download your code.
Back to Community
Seem to be having trouble with Pipeline Output?

Hey guys,

Having Trouble with Pipeline output.

Error says

TypeError: zipline.pipeline.pipeline.validate_column() expected a value of type zipline.pipeline.term.Term for argument 'term', but got list instead.

And the line of code it says is incorrect is:

    pipe = Pipeline(columns = pipe_columns,screen = pipe_screen)  

Here's the code I'm getting an error on:

import talib  
import numpy as np  
import math  
import pandas as pd  
from quantopian.algorithm import attach_pipeline, pipeline_output  
from quantopian.pipeline import Pipeline  
from quantopian.pipeline.data.builtin import USEquityPricing  
from quantopian.pipeline.factors import AverageDollarVolume  
from quantopian.pipeline.filters import Q1500US  
from quantopian.pipeline.filters import Q500US  
from quantopian.pipeline.data import morningstar  
from quantopian.pipeline.data import Fundamentals

class ADX_Dir_Ind():  
    inputs=[USEquityPricing.high,USEquityPricing.low,USEquityPricing.close]  
    true_length=14  
    window_length=true_length+true_length  
    def compute(self, today, assets, out, high, low, close):  
        anynan =np.isnan(close).any(axis=0)  
        for col_ix, have_nans in enumerate(anynan):  
            if have_nans:  
                out[col_ix] = np.nan  
                continue  
            results = talib.ADX(  
                high[:, col_ix],  
                low[:, col_ix],  
                close[:, col_ix],  
                timeperiod=self.true_length)  
            out[col_ix] = results[-1]  
class MarketCap():  
    # Pre-declare inputs and window_length  
    inputs = [USEquityPricing.close, Fundamentals.shares_outstanding]  
    window_length = 1

    # Compute market cap value  
    def compute(self, today, assets, out, close, shares):  
        out[:] = close[-1] * shares[-1]

def initialize(context):  
    ADX = ADX_Dir_Ind()  
    MktCap = Fundamentals.market_cap.latest  
    context.b = sid(8554)  
    set_benchmark(context.b)

    # Create a screen to remove penny stocks  
    remove_penny_stocks = [USEquityPricing.close] > 15

    # Rank a factor using a mask to ignore the values we're  
    # filtering out by passing mask=remove_penny_stocks to rank.

    pipe_columns = {  
        'Price':[USEquityPricing.close],  
        'ADX':ADX,  
        'Mkt Cap':MktCap  
    }  
    Screen = ((ADX<20) & (remove_penny_stocks) & (MktCap>500000000))

    # Use multiple screens to narrow the universe  
    pipe_screen = (Screen)

    pipe = Pipeline(columns = pipe_columns,screen = pipe_screen)  
    attach_pipeline(pipe, 'example')  
    """schedule_function(trade, date_rules.every_day(), time_rules.market_open())"""  
def before_trading_start(context, data):  
    output = pipeline_output('example')  
    context.my_securities = output.sort('ADX', ascending=True).iloc[:20]  
    print len(context.my_securities)

    context.security_list = context.my_securities.index  
    log.info("\n" + str(context.my_securities.head(5)))  

Any Ideas why it's not working? shouldn't it be returning a list?

10 responses

Not sure if this is the only problem but you will need to add the 'latest' method and remove the brackets to the close prices. Like this.

# Create a screen to remove penny stocks  
remove_penny_stocks = USEquityPricing.close.latest> 15  
pipe_columns = {  
    'Price':USEquityPricing.close.latest,  
    'ADX':ADX,  
    'Mkt Cap':MktCap  
    } 

Hey Dan, thanks for the reply. Did what you said, got a different error this time. Here is what the error said:

TypeError: zipline.pipeline.pipeline.validate_column() expected a value of type zipline.pipeline.term.Term for argument 'term', but got instance instead.

Error is still in the same line as before. Here is the code:

import talib  
import numpy as np  
import math  
import pandas as pd  
from quantopian.algorithm import attach_pipeline, pipeline_output  
from quantopian.pipeline import Pipeline  
from quantopian.pipeline.data.builtin import USEquityPricing  
from quantopian.pipeline.factors import AverageDollarVolume  
from quantopian.pipeline.filters import Q1500US  
from quantopian.pipeline.filters import Q500US  
from quantopian.pipeline.data import morningstar  
from quantopian.pipeline.data import Fundamentals

class ADX_Dir_Ind():  
    inputs=[USEquityPricing.high.latest,USEquityPricing.low.latest,USEquityPricing.close.latest]  
    true_length=14  
    window_length=true_length+true_length  
    def compute(self, today, assets, out, high, low, close):  
        anynan =np.isnan(close).any(axis=0)  
        for col_ix, have_nans in enumerate(anynan):  
            if have_nans:  
                out[col_ix] = np.nan  
                continue  
            results = talib.ADX(  
                high[:, col_ix],  
                low[:, col_ix],  
                close[:, col_ix],  
                timeperiod=self.true_length)  
            out[col_ix] = results[-1]  
class MarketCap():  
    # Pre-declare inputs and window_length  
    inputs = [USEquityPricing.close.latest, Fundamentals.shares_outstanding]  
    window_length = 1

    # Compute market cap value  
    def compute(self, today, assets, out, close, shares):  
        out[:] = close[-1] * shares[-1]

def initialize(context):  
    ADX = ADX_Dir_Ind()  
    MktCap = Fundamentals.market_cap.latest  
    context.b = sid(8554)  
    set_benchmark(context.b)

    # Create a screen to remove penny stocks  
    remove_penny_stocks = USEquityPricing.close.latest > 15

    # Rank a factor using a mask to ignore the values we're  
    # filtering out by passing mask=remove_penny_stocks to rank.

    pipe_columns = {  
        'Price':USEquityPricing.close.latest,  
        'ADX':ADX,  
        'Mkt Cap':MktCap  
    }  
    Screen = ((ADX<20) & (remove_penny_stocks) & (MktCap>500000000))

    # Use multiple screens to narrow the universe  
    pipe_screen = (Screen)

    pipe = Pipeline(columns = pipe_columns,screen = pipe_screen)  
    attach_pipeline(pipe, 'example')  
    """schedule_function(trade, date_rules.every_day(), time_rules.market_open())"""  
def before_trading_start(context, data):  
    output = pipeline_output('example')  
    context.my_securities = output.sort('ADX', ascending=True).iloc[:20]  
    print len(context.my_securities)

    context.security_list = context.my_securities.index  
    log.info("\n" + str(context.my_securities.head(5)))  

Thanks again for the help

Could you attach a backtest. Even if you comment out the whole thing to get it to run. It makes it much easier to troubleshoot.

Regards

Yea absolutely, my bad about that.

Didn't know if you'd prefer the code or the backtest!

Hope this helps :)

Two issues.

A little over zealous with the 'latest' method. When giving inputs to a custom factor one specifies the dataset. Think of this as defining the field in a database but not specifying the actual data from that field. Adding the '.latest' method creates a factor which can be thought of as the actual value (not just the field). So, the input specification should be

    inputs=[USEquityPricing.high, USEquityPricing.low, USEquityPricing.close]  

The other issue is that custom factors need to be defined as a subclass of 'CustomFactor'. So, the custom factors should be specified like this

class ADX_Dir_Ind(CustomFactor):  
# and  
class MarketCap(CustomFactor):

Remember to import the 'CustomFactor' class.

That get's your algorithm running. Not sure it's doing what you want but there shouldn't be errors.

Attached is the updated algo.

Good luck.

Hey dan, Thanks for your help again, pipeline is running beautifully. I seem to be having trouble with my TA-Lib implementation though after getting my results. Any idea why I'm getting the "High has wrong dimension" error?

The error "high has wrong dimensions" means the input parameter "high" to the talib.PLUS_DI method has 2 dimensions and it's expecting a single dimension (ie a series). Actually you will run into issues with all the parameters and all three of the ta-lib methods you are using.

Basically, all the ta-lib methods work on a single security at a time and generally expect a single series for each parameter. The 'data.history' methods in the statements just before the error all return 2 dimensional dataframes organized with dates as rows (axis 0) and securities for columns (axis 1). The securities are all the securities you have in "context.squeeze". So, you are passing data for multiple securities to a ta-lib method but it expects data for just a single security.

A solution would be to loop over all the securities in "context.squeeze" and pass just the data column associated with a single security each time through the loop. However, a more efficient (ie runs faster) solution would be to convert the two PLUS_DI and MINUS_DI ta-lib indicators to CustomFactors just as was done for ADX. Also, use the pipeline value for ADX that's already been calculated. No need to recalculate it (though I did notice the window lengths were different if that make a difference).

Not related to this error is a problem further down in the code. Not a big one but will need to be fixed. Change 'context.stock' to 'stock' in the following

    for stock in context.squeeze:  
        if nDI < pDI and ADX >20 and sqzOn:  
            if stock not in open_orders and data.can_trade(context.stock):  
                order_target_percent(context.stock, 1)  
        if nDI > pDI and ADX >20 and sqzOn:  
            if stock not in open_orders and data.can_trade(context.stock):  
                order_target_percent(context.stock, -1)  
        if sqzOff:  
            if stock not in open_orders and data.can_trade(context.stock):  
                order_target_percent(context.stock, 0)

should be

    for stock in context.squeeze:  
        if nDI < pDI and ADX >20 and sqzOn:  
            if stock not in open_orders and data.can_trade(stock):  
                order_target_percent(stock, 1)  
        if nDI > pDI and ADX >20 and sqzOn:  
            if stock not in open_orders and data.can_trade(stock):  
                order_target_percent(stock, -1)  
        if sqzOff:  
            if stock not in open_orders and data.can_trade(stock):  
                order_target_percent(stock, 0)  

Hope that helps.

Hey Dan, Why am I getting all these build errors with my variables and any idea what this "TypeError: 'ABCMeta' object is not iterable" Means?

That is an unfortunately cryptic error message. It stems from the two lines in your code

    pDI = PLUS_D(CustomFactor)  
    nDI = MINUS_D(CustomFactor)

Those should be

    pDI = PLUS_D()  
    nDI = MINUS_D()

One uses 'CustomFactor' when defining a class (ie one of your custom factors) to tell it that you want the class to be based upon the CustomFactor class. However, when one instantiates that class, what goes inside the parenthesis are any parameters one want to pass when creating the instance.

So, remove the 'CustomFactor' and that error should go away. I believe you will get a some other errors, but that one will be fixed.

Good luck.

Dan is rlly out hear helping everyone