Quantopian's community platform is shutting down. Please read this post for more information and download your code.
Back to Community
How to implement moving average of a pipeline factor?

Hello,

I am thinking of computing a moving average of certain factors in pipeline. For example, I want to have the 90 days moving average of RSI (14 days RSI) as a factor. How should I do it under the framework of pipeline?

I have two possible solutions in my mind:
1) create a custom factor function which includes some kind of "for loop" (window_length = 104 etc) to calculate the RSI for each date and then calculate the average of last 90 days. But this way does not leverage the built-in RSI factor functionality.
2) create a variable (90 x n dimension) to store the RSI value (calculated from the built-in RSI factor) for the last 90 days for each day. And then calculate the average when needed.

And this doesn't work (i am not very familiar with Python. maybe this is obviously wrong):

rsi_ma = SimpleMovingAverage(inputs=[RSI(inputs=[USEquityPricing.close]], window_length=14), window_length=90)  

Thank you for your help!
Bing

10 responses

Is there a best practice here on how to approach?

Is not so easy to implement what you need (see this feature request here), I hope they'll improve pipeline factors design eventually. Currently if you want to create a moving average of RSI you need to code it from scratch.

As an example, here are two factors I created to calculate the moving average of a percentage change:

#This is the initial factor, and I'd like to calculate the moving average of this  
class PercentChange(CustomFactor):  
    """  
    Calculates the percent change of input over the given window_length.  
    """  
    def compute(self, today, assets, out, data):  
        out[:] = (data[-1] - data[0]) / data[0]  

# As I cannot pass  PercentChange as input to SimpleMovingAverage, I have to subclass SimpleMovingAverage  
class SimpleMovingAveragePercentChange(SimpleMovingAverage):  
    """  
    Average Value of an arbitrary column  
    **Default Inputs**: None  
    **Default Window Length**: None  
    """  
    def compute(self, today, assets, out, data):  
        prct_change = np.diff(data, axis = 0)   # absolute change  
        prct_change /= data[:-1]                # percentage change  
        SimpleMovingAverage.compute(self, today, assets, out, prct_change)   

# The same apply to ExponentialWeightedMovingAverage  
class ExponentialWeightedMovingAveragePercentChange(ExponentialWeightedMovingAverage):  
    """  
    Exponentially Weighted Moving Average  
    **Default Inputs:** None  
    **Default Window Length:** None  
    Parameters  
    ----------  
    inputs : length-1 list/tuple of BoundColumn  
        The expression over which to compute the average.  
    window_length : int > 0  
        Length of the lookback window over which to compute the average.  
    decay_rate : float, 0 < decay_rate <= 1  
        Weighting factor by which to discount past observations.  
        When calculating historical averages, rows are multiplied by the  
        sequence::  
            decay_rate, decay_rate ** 2, decay_rate ** 3, ...  
    """  
    def compute(self, today, assets, out, data, decay_rate):  
        prct_change = np.diff(data, axis = 0)   # absolute change  
        prct_change /= data[:-1]                # percentage change  
        ExponentialWeightedMovingAverage.compute(self, today, assets, out, prct_change, decay_rate)

Thank you very Luca for your help.

I've come to the conclusion that Quantopian is not very well suited for technical analysis.
If you want to combine some factors like MACD cross, Slow stocastic cross and some other factors (the basic stuff, so to say) this doesn't work very well. .
The built in MACD and Slow stoch does not provide only the signal line and %K and you cannot easily feed it into an SMA to calculate the other lines yourself because that doesn't work. Also determining a crossover is not built in, can be done but is very artificial with overwriting a customfactor.
If you combine a few factors and crossovers you lose sight of what you're doing. And tools like this should make your live easier, not more difficult.
You could use the ta-lib library (eventually the best way, I think) but this is also very badly integrated, so you should use some tricks here as well with looping over the data.

Also the 2 possible ways to do things with handle_data (which is not recommended but works much better with ta-lib) and the pipeline are confusing.
All in all quantopian ca be useful for some tasks but I would not recommend it for technical analysis.

Do people agree or am I missing something (sorry, this is probably not the right place and the reply is too long but I have been struggling for days )?

Yes, it is difficult. For me, it would help to have more complex custom factor examples. While this group probably isn't going to go crazy for technical analysis, I think being able to create more complex factors derived from price would be helpful: maybe the Quontopian folks could throw out a few curated examples?

My goal wouldn't be to do a bunch of technical analysis here but to marry some price studies with other factors and machine learning.

Hang in there Ber.

I had the same question / problem quite a while ago. At first I used the talib library as shown in this post. You can use the output of any indicator as input for another one. In the official docs you can see what indicators they provide. There's also a formula for most of them.

However, talib can be very slow since you must iterate over all assets in order to get the values. So I began writing my own indicators with the goals
a) to get the same or at least very similar results as with talib and
b) vectorize the calculations using mostly numpy and scipy to speed things up.
As a starting point I used this tutorial series by Sentdex. Althogh he uses for-loops in each one of them (again, for-loops=slow), it gives you a good insight on how to implement indicators in python. Once you understand the iterative code you can ask yourself "how could I get the same result using numpy arrays?" (you could also use pandas which can be easier, but I often got timeout / memory errors using pandas in pipeline). Sometimes it's just as easy as searching the web for "numpy [some indicator]".

So for the original question, here's a custom factor calculating the 90-sma of the 14-rsi. It uses the functions "my_ema" (needed for rsi), "my_rsi" and "my_sma". Each one of them is completely vectorized and comes pretty close to the corresponding talib indicators (they only differ in the beginning of the time series, the later values are the same). They work for 1D-arrays (one single symbol) as well as for 2D-arrays (more symbols). If anyone is interested in other indicators, I can have a look if I already coded them...

# some imports in addition to the usual ones  
from quantopian.pipeline import CustomFactor  
from quantopian.pipeline.data.builtin import USEquityPricing  
import numpy as np  
from scipy.ndimage import convolve1d

def my_sma(cls, n):  
    conv1 = np.ones(n) / n  
    fill = int(n / 2)  
    result = convolve1d(cls, conv1, axis=0, mode='constant', cval=np.nan, origin=-fill)  
    return result

def my_ema(cls, n, alpha=None):  
    '''  
    Ok, I admit I haven't fully understood these calculations.  
    The code is from here:  
    https://stackoverflow.com/a/42926270  
    with a few modifications.  
    '''  
    if alpha is None:  
        alpha = 2 /(n + 1.0)

    alpha_rev = 1-alpha  
    n = cls.shape[0]

    pows = alpha_rev**(np.arange(n+1))  
    if cls.ndim > 1:  
        pows = np.tile(pows, (cls.shape[1],1)).T

    scale_arr = 1/pows[:-1]  
    offset = cls[0]*pows[1:]  
    pw0 = alpha*alpha_rev**(n-1)  
    mult = cls*pw0*scale_arr  
    cumsums = np.cumsum(mult, axis=0)  
    result = offset + cumsums*scale_arr[::-1]

    return result

def my_rsi(cls, n):  
    delta = np.diff(cls, axis=0)  
    if cls.ndim == 1:  
        delta = np.append(0, delta)  
    else:  
        zeros = np.zeros(cls.shape[1])  
        delta = np.vstack([zeros, delta])

    up, down = delta.copy(), -delta.copy()  
    up[up<0] = 0  
    down[down<0] = 0  
    rs = my_ema(up, n, alpha=1/n) / my_ema(down, n, alpha=1/n)  
    result = 100 - 100 / (1 + rs)  
    return result


class SmaRsi(CustomFactor):  
    inputs = [USEquityPricing.close]  
    window_length = 500  
    window_safe = True

    def compute(self, today, assets, out, c):  
        rsi = my_rsi(c, 14)  
        sma_rsi = my_sma(rsi, 90)

        out[:] = sma_rsi[-1] 

@Bing
On further investigation I found out that the solution to your question is much simpler:
Yes there is somthing wrong with your line of code

rsi_ma = SimpleMovingAverage(inputs=[RSI(inputs=[USEquityPricing.close]], window_length=14), window_length=90)  

You closed the brackets surrounding the inputs for sma too early. It works if you change it like this

rsi_ma = SimpleMovingAverage(inputs=[RSI(inputs=[USEquityPricing.close], window_length=14)], window_length=90)  

When I started coding my own indicators this wasn't possible, but now it is and I learned a lot about indicators.

@Tentor
Hi Tentor, this looks pretty good. This looks like something that could be used in more general cases. I will investigate.
Thanks for sharing.

I have found some more simple solutions myself:
For example the builtin MovingAverageConvergenceDivergenceSignal only provides the signal line and you cannot input this into the standard moving average.
However if you make your own macd very simple from the standard like this:

class MovingAverageConvergenceDivergenceSignal2(MovingAverageConvergenceDivergenceSignal):  
    window_safe = True  

Then you can use MovingAverageConvergenceDivergenceSignal2 as input to the standard moving average like this:
``` macd_signal = MovingAverageConvergenceDivergenceSignal2(inputs=[USEquityPricing.close])
macd = SimpleMovingAverage(inputs=[macd_signal],window_length=9)

This works fine and you can use it in your pipeline. The only thing is that since you state window_safe=True that you must guard the integrity of the data input yourself. But most of the time this works fine.  

Sorry for messing up the formatting, the last part should be like this:

Then you can use MovingAverageConvergenceDivergenceSignal2 as input to the standard moving average like this:

macd_signal = MovingAverageConvergenceDivergenceSignal2(inputs=[USEquityPricing.close])  
macd = SimpleMovingAverage(inputs=[macd_signal],window_length=9)  

This works fine and you can use it in your pipeline. The only thing is that since you state window_safe=True that you must guard the integrity of the data input yourself. But most of the time this works fine.

@Ber
Your solution has the advantage that it should work with any indicator. Some builtins seem to already have the parameter window_safe (like RSI with True as default) and others not (like MACD). My solution (aka modify nothing) only works with the former.