Hello,
I am working with pipeline and could use some guidance. I am trying to get the last ten prices for a stock added to a pipeline and since you can't use history() function in pipeline, I can not find a way to add those prices. Any help?
Hello,
I am working with pipeline and could use some guidance. I am trying to get the last ten prices for a stock added to a pipeline and since you can't use history() function in pipeline, I can not find a way to add those prices. Any help?
Create a custom factor. By definition pipeline outputs (ie columns in the returned dataframe) are single values. A pipeline output cannot be an array of values for example. Therefore you will need a separate output for each value. Maybe something like this. The attached notebook shows it in action.
class Latest_10(CustomFactor):
"""
Gets the latest prices for each asset
"""
inputs = [USEquityPricing.close]
window_length = 10
outputs = ['day1', 'day2', 'day3', 'day4', 'day5', 'day6', 'day7', 'day8', 'day9', 'day10']
def compute(self, today, assets, out, close):
out.day1[:] = close[-1]
out.day2[:] = close[-2]
out.day3[:] = close[-3]
out.day4[:] = close[-4]
out.day5[:] = close[-5]
out.day6[:] = close[-6]
out.day7[:] = close[-7]
out.day8[:] = close[-8]
out.day9[:] = close[-9]
out.day10[:] = close[-10]
If you are then wanting to manipulate or compare those prices, you may be better off creating a custom factor which does that manipulation inside the pipeline.
Hope that helps.
What are you trying to do with the past n days data? Find a max? Determine average? Calculate a trading signal? Use the data for a TA-lib function? I gather you want to DO something with that data. As mentioned, the pipeline framework is built around doing those calculations within factors . It's not a direct replacement for the history function which delivers the data as a raw array and then any calcs are done afterwards. The intention is to do the data manipulation within the pipeline.
Maybe give an example of what you are trying to accomplish.
I want to store the last n prices for a stock and then have a loop that takes price j and divide by price i (j being 1 and i being 0) and increment i and j each time to get a distribution I can sample from.
AAPL:
Price[1] = 90
Price[0] = 92
sample = 90/92
Calculating 'sample' is straightforward
class Sample(CustomFactor):
inputs = [USEquityPricing.close]
window_length = 2
def compute(self, today, assets, out, close):
out[:] = close[1]/close[0]
What do you mean by increment i and j each time to get a distribution I can sample from. Are you wanting to pull data into a notebook and manipulate it or are you wanting to use this data for an algorithm? If you want to work within a notebook in the research environment than maybe simply use the get_pricing
or get_fundamentals
methods?
I want to use it in my algorithm! I mean I want the sample to be an array of a lot of samples so that sample class above would work but if I had a window length of 3 I would want the output to be an array of two numbers: close[1]/close[0] and close[2]/close[1]. So when i say increment i and j, I need
out[:] = close[i]/close[j] until the window_length is reached
As mentioned, a pipe returns a single value for each asset/column. You won't be able to return an array. I suspect ultimately you don't want an array but a simple boolean trade or select signal. There must be some logic which takes the array you are looking for and turns it into an actionable value. Just put that logic into a custom factor and return that value. Perhaps I'm missing something?
So big picture I want to have a distribution of values that contains multiple ln(close(today)/close(yesterday)) values. The best way i thought to do that was have all the closes for stock within a given period of time set in an array. So say my lookback is 10, I want the last 10 day close prices for a stock stored somewhere so I can then have a for loop that takes every value in the array and ln(close(today)/close(yesterday)) and stores all those answers in some sort of array or distribution! Then I want to be able to take a random number from that sample array or distribution
So this is giving me what I want! How would I pick one of those ten outputs from Latest_10 randomly??
class Latest_10(CustomFactor):
"""
Gets the latest prices for each asset
"""
inputs = [USEquityPricing.close]
window_length = 10
outputs = ['s1', 's2', 's3', 's4', 's5', 's6', 's7', 's8', 's9', 's10']
def compute(self, today, assets, out, close):
out.s1[:] = np.log(close[0]/close[-1])
out.s2[:] = np.log(close[-1]/close[-2])
out.s3[:] = np.log(close[-2]/close[-3])
out.s4[:] = np.log(close[-3]/close[-4])
out.s5[:] = np.log(close[-4]/close[-5])
out.s6[:] = np.log(close[-5]/close[-6])
out.s7[:] = np.log(close[-6]/close[-7])
out.s8[:] = np.log(close[-7]/close[-8])
out.s9[:] = np.log(close[-8]/close[-9])
out.s10[:] = np.log(close[-9]/close[-10])
def initialize(context):
"""
Called once at the start of the algorithm.
"""
# Rebalance every day, 1 hour after market open.
schedule_function(my_rebalance, date_rules.every_day(), time_rules.market_open())
# Record tracking variables at the end of each day.
schedule_function(my_record_vars, date_rules.every_day(), time_rules.market_close())
context.security_list = symbol('AAPL')
# Create our dynamic stock selector.
attach_pipeline(make_pipeline(), 'my_pipeline')
def make_pipeline():
"""
A function to create our dynamic stock selector (pipeline). Documentation on
pipeline can be found here: https://www.quantopian.com/help#pipeline-title
"""
pipe = Pipeline()
#call Latest_10 Class
sample = Latest_10(window_length=10)
# Add the desired values to our pipe.
pipe.add(sample, 'sample')
return pipe
For loops aren't very 'pythonic'. What you want to do can be accomplished in a single statement
import numpy as np
log_returns = np.diff(np.log(my_array))
Given an RxC array of prices, where there are R rows of days and C columns of securities, this will return a (R-1)xC array (one less day) of log returns for each security. The last row [-1] should be the latest day. This format by the way is the same format that inputs are presented in a custom factor.
Is that what you needed? You can then then take your random sample from the returned 'log_returns' array. This can be done either within a custom factor or outside of the pipeline and simply use the data.history method to fetch the data.
So since I already did the np.log in my original custom factor i took that part out but when I try to do that line
#call Latest_10 Class
sample = Latest_10(window_length=10)
number = np.diff(sample) #turn the Close_10 class output into an array
random = np.random.rand(number) #take random number from that array
# Add the desired values to our pipe.
pipe.add(random, 'random')
I keep getting an "list assignment index out of range" error any idea why??
Is there anyway I could have this custom factor do something like this?
class Latest_10(CustomFactor):
"""
Gets the latest prices for each asset
"""
inputs = [USEquityPricing.close]
window_length = 10
outputs = ['random']
def compute(self, today, assets, out, close):
s1 = np.log(close[0]/close[-1])
s2 = np.log(close[-1]/close[-2])
s3 = np.log(close[-2]/close[-3])
s4 = np.log(close[-3]/close[-4])
s5 = np.log(close[-4]/close[-5])
s6 = np.log(close[-5]/close[-6])
s7 = np.log(close[-6]/close[-7])
s8 = np.log(close[-7]/close[-8])
s9 = np.log(close[-8]/close[-9])
s10 = np.log(close[-9]/close[-10])
sample = ['s1', 's2', 's3', 's4', 's5', 's6', 's7', 's8', 's9', 's10']
out.random[:] = np.random.rand(sample)
I believe I understand what you are looking for. Maybe still some mis-understanding, but also maybe closer?
What I believe you are looking for:
- a factor which returns a single log-return for each security.
- the log-return is selected at random from the security's previous n returns.
- the log-returns are calculated from the daily close prices of each security.
This should do it...
import numpy as np
class Random_Return(CustomFactor):
"""
This factor returns a single log return for each security.
This return is selected at random from the previous n returns.
The returns are calculated from the daily close prices of each security.
n+1 is the window_length. If you want to select from a different window, then
simply set the window_length to a different value.
"""
inputs = [USEquityPricing.close]
window_length = 10
def compute(self, today, assets, out, close):
log_returns = np.diff(np.log(close), axis=0)
my_choices = np.apply_along_axis(np.random.choice, 0, log_returns)
out[:] = my_choices
The numpy 'random.choice' method does the heavy lifting in doing the random selection. Check out https://docs.scipy.org/doc/numpy/reference/generated/numpy.random.choice.html#numpy.random.choice for more details.
Open the attached notebook and play with the final cell to see it in action. Note that I didn't try this in an algorithm but assume it should work the same.
Im getting this error any idea why?
"ValueError: cannot copy sequence with size 1000 to array axis with dimension 8341"
class Last_Close_10(CustomFactor):
"""
Gets the latest prices for each asset
"""
inputs = [USEquityPricing.close]
window_length = 10
outputs = ['yesterday','projection']
def compute(self, today, assets, out, close):
# gets log return from last 10 close prices
s1 = np.log(close[0]/close[-1])
s2 = np.log(close[-1]/close[-2])
s3 = np.log(close[-2]/close[-3])
s4 = np.log(close[-3]/close[-4])
s5 = np.log(close[-4]/close[-5])
s6 = np.log(close[-5]/close[-6])
s7 = np.log(close[-6]/close[-7])
s8 = np.log(close[-7]/close[-8])
s9 = np.log(close[-8]/close[-9])
s10 = np.log(close[-9]/close[-10])
# puts all of the log returns into an array
sample = [s1, s2, s3, s4, s5, s6, s7, s8, s9, s10]
# pick a random number from the array (sample)
random_number = random.choice(sample)
# project tomorrows price
count = 0
num = 0
monte_carlo = []
for num in range(0,1000):
i = close[-1] * np.exp(random_number)
tomorrow_projection = 0
for count in range(0, 10):
tomorrow_projection = i * np.exp(random_number)
i = tomorrow_projection
count = count + 1
monte_carlo.append(i)
num = num + 1
# output (yesterdays price, projection of tomorrows price)
out.yesterday[:] = close[-1]
out.projection[:] = monte_carlo
The error is exactly what it says. "cannot copy sequence with size 1000 to array axis with dimension 8341"
out.projection[:] = monte_carlo
The 'monte_carlo' object is a sequence which has 1000 entries (each entry "i" is an array of your modified close prices. the array has 1 row and 8341 columns - one column for each security.) This was created in the 'for loop' with the statement ' monte_carlo.append(i).
The 'out.projection' object is a numpy array with one 1 row of values (which you need to fill in somehow) and 8341 columns - one for each security.
You are trying to assign a 1x1000 object to a 1x8341 object and python doesn't understand what you mean.
Getting an NAN when printing out the std for the list any idea why??
class Last_Close_30(CustomFactor):
"""
Gets the latest prices for each asset
"""
inputs = [USEquityPricing.close]
window_length = 30
outputs = ['yesterday','projection','difference','standard']
def compute(self, today, assets, out, close):
a = -0
b = -1
look_back = 0
sample = []
# gets log return from last 30 close prices and puts in array
for look_back in range(0,30):
logs = np.log(close[a]/close[b])
sample.append(logs)
a = a-1
b = b-1
look_back = look_back + 1
# pick a random number from the array (sample)
random_number = random.choice(sample)
# project 10 time steps ahead and make projected price 10 days out
# that process done 1000 times (monte carlo)
count = 0
num = 0
monte_carlo = []
for num in range(0,1000):
i = close[-1] * np.exp(random_number)
tomorrow_projection = 0
for count in range(0, 10):
tomorrow_projection = i * np.exp(random_number)
i = tomorrow_projection
count = count + 1
monte_carlo.append(i)
num = num + 1
# average close prices for each stock
average = (sum(monte_carlo))/1000
# difference from average
difference = average - close[-1]
std_mc = np.std(monte_carlo) #NOT WORKING
# output (yesterdays price, projection of tomorrows price)
out.yesterday[:] = close[-1]
out.projection[:] = average
out.difference[:] = difference
out.standard[:] = std_mc