Notebook
In [63]:
from quantopian.pipeline import Pipeline
from quantopian.research import run_pipeline
from quantopian.pipeline.data.builtin import USEquityPricing
from quantopian.pipeline.factors import Returns
from quantopian.pipeline.factors import ExponentialWeightedMovingAverage
from quantopian.pipeline.filters.morningstar import Q1500US
from quantopian.pipeline import CustomFactor
import pandas as pd
import numpy as np


class upper_partial_moment(CustomFactor):
    def compute(self, today, assets, out, returns):
        r = np.where(returns > 0, returns, 0)
        n = np.count_nonzero(r, axis = 0)
        diff = np.apply_along_axis(abs_diff, 0, r)
        out[:] = np.power((diff.sum(axis = 0)/n), 0.5)
        
class lower_partial_moment(CustomFactor):
    def compute(self, today, assets, out, returns):
        r = np.where(returns < 0, returns, 0)
        n = np.count_nonzero(r, axis = 0)
        diff = np.apply_along_axis(abs_diff, 0, r)
        out[:] = np.power((diff.sum(axis = 0)/n), 0.5)
        
def abs_diff(r):
    return ((r - 0)**2)

class my_signal(CustomFactor):
    def compute(self, today, assets, out, UPM, LPM):
        my_signal = UPM / LPM - 1
        out[:]=ExponentialWeightedMovingAverage(inputs=[my_signal], window_length=9, decay_rate=0.90)
        
def make_pipeline():
    base_universe = Q1500US()
    
    returns = Returns(window_length=100)
    
    UPM = upper_partial_moment(inputs = [returns], window_length=30, mask=base_universe)
    LPM = lower_partial_moment(inputs = [returns], window_length=30, mask=base_universe)
    
    signal = my_signal(inputs = [UPM, LPM], window_length=10, mask=base_universe)
    
    return Pipeline(columns={'Returns':returns, 'UPM':UPM, 'LPM':LPM, 'Signal':signal}, screen=(base_universe))

result = run_pipeline(make_pipeline(), '2017-01-01', '2017-09-30')

NonWindowSafeInputTraceback (most recent call last)
<ipython-input-63-675424e8c427> in <module>()
     44     return Pipeline(columns={'Returns':returns, 'UPM':UPM, 'LPM':LPM, 'Signal':signal}, screen=(base_universe))
     45 
---> 46 result = run_pipeline(make_pipeline(), '2017-01-01', '2017-09-30')

<ipython-input-63-675424e8c427> in make_pipeline()
     40     LPM = lower_partial_moment(inputs = [returns], window_length=30, mask=base_universe)
     41 
---> 42     signal = my_signal(inputs = [UPM, LPM], window_length=10, mask=base_universe)
     43 
     44     return Pipeline(columns={'Returns':returns, 'UPM':UPM, 'LPM':LPM, 'Signal':signal}, screen=(base_universe))

/build/src/qexec_repo/zipline_repo/zipline/pipeline/mixins.pyc in __new__(cls, inputs, outputs, window_length, mask, dtype, missing_value, ndim, **kwargs)
    138             missing_value=missing_value,
    139             ndim=ndim,
--> 140             **kwargs
    141         )
    142 

/build/src/qexec_repo/zipline_repo/zipline/pipeline/term.pyc in __new__(cls, inputs, outputs, window_length, mask, *args, **kwargs)
    472             mask=mask,
    473             window_length=window_length,
--> 474             *args, **kwargs
    475         )
    476 

/build/src/qexec_repo/zipline_repo/zipline/pipeline/term.pyc in __new__(cls, domain, dtype, missing_value, window_safe, ndim, *args, **kwargs)
    129                     ndim=ndim,
    130                     params=params,
--> 131                     *args, **kwargs
    132                 )
    133             return new_instance

/build/src/qexec_repo/zipline_repo/zipline/pipeline/term.pyc in _init(self, inputs, outputs, window_length, mask, *args, **kwargs)
    480         self.window_length = window_length
    481         self.mask = mask
--> 482         return super(ComputableTerm, self)._init(*args, **kwargs)
    483 
    484     @classmethod

/build/src/qexec_repo/zipline_repo/zipline/pipeline/term.pyc in _init(self, domain, dtype, missing_value, window_safe, ndim, params)
    267         # should set this flag to True.
    268         self._subclass_called_super_validate = False
--> 269         self._validate()
    270         assert self._subclass_called_super_validate, (
    271             "Term._validate() was not called.\n"

/build/src/qexec_repo/zipline_repo/zipline/pipeline/mixins.pyc in _validate(self)
     40     """
     41     def _validate(self):
---> 42         super(PositiveWindowLengthMixin, self)._validate()
     43         if not self.windowed:
     44             raise WindowLengthNotPositive(window_length=self.window_length)

/build/src/qexec_repo/zipline_repo/zipline/pipeline/mixins.pyc in _validate(self)
     85 
     86     def _validate(self):
---> 87         super(RestrictedDTypeMixin, self)._validate()
     88         assert self.ALLOWED_DTYPES is not NotSpecified, (
     89             "ALLOWED_DTYPES not supplied on subclass "

/build/src/qexec_repo/zipline_repo/zipline/pipeline/term.pyc in _validate(self)
    538             for child in self.inputs:
    539                 if not child.window_safe:
--> 540                     raise NonWindowSafeInput(parent=self, child=child)
    541 
    542     def _compute(self, inputs, dates, assets, mask):

NonWindowSafeInput: Can't compute windowed expression my_signal((upper_partial_moment((Returns((USEquityPricing.close::float64,), window_length=100),), window_length=30), lower_partial_moment((Returns((USEquityPricing.close::float64,), window_length=100),), window_length=30)), window_length=10) with windowed input upper_partial_moment((Returns((USEquityPricing.close::float64,), window_length=100),), window_length=30).