I've created a few functions that might help you out.
The following function takes a list of dates (timestamps), and runs the pipeline for each date, and then returns a single merged dataframe of that info.
def run_pipeline_list(pipe, pipeline_dates):
"""
Drop-in replacement for run_pipeline.
run_pipeline fails over a very long period of time (memory usage),
so we need to split in chunks the pipeline and concatenate the results
"""
chunks = []
for date in pipeline_dates:
print "Processing {:.10} pipeline".format(str(date[0]))
results = run_pipeline(pipe, date[0], date[0])
for col in list(results.select_dtypes(include=['category']).columns):
results[col] = results[col].astype('O')
# convert category dtype to Object
print'shape: {1}\n'.format(str(date[0]),results.shape)
chunks.append(results)
try:
print '\nCombined dataframe created'
return pd.concat(chunks)
except:
print '\npd.concat failed'
return chunks
This function generates a list of dates (e.g. Tuesday of every week) to be fed into the above function. You can easily modify the code for end of month dates.
def dt_intervals(beg_date, end_date, day_index):
''' Creates datetime intervals to save on memory
parameters:
----------
day_index => day of the week to run calculations
0-mon, 1-tues, 2-wed, . . . 4-fri
returns: datetime index
'''
trng = pd.date_range(beg_date, end_date)
cal = USFederalHolidayCalendar()
holidays = cal.holidays(start=trng.min(), end=trng.max()) # list of holidays occuring btn start/end dates
trng_no_holidays = trng[~trng.isin(holidays)] # make sure eligible valid date df excludes US holidays
trng_no_holidays_wknds = trng_no_holidays[trng_no_holidays.weekday < 5] # exclude saturday/sunday; != 5/6
pipeline_dates = []
for year in set(trng_no_holidays_wknds.year): # set -> get unique year values in time range
tmp = trng_no_holidays_wknds[trng_no_holidays_wknds.year == year] # slice year wise
for week in set(tmp.week): # for each week in slice
temp = tmp[tmp.week == week]
day = temp[temp.weekday == day_index] # select a day of the week 0-monday, 1-tuesday . . . 4-friday
if len(day) == 1: pipeline_dates.append(day)
else: pipeline_dates.append(temp[temp.weekday == temp.weekday.max()]) # last day of week if problem
# pipeline_dates.append(temp[temp.weekday == temp.weekday.min()]) # begining of week
return sorted(pipeline_dates)