У меня есть кадр данных fulldb_accrep_united
такого типа:
SparkID ... Period
0 913955 ... {"@PeriodName": "2000", "@DateBegin": "2000-01...
1 913955 ... {"@PeriodName": "1999", "@DateBegin": "1999-01...
2 16768 ... {"@PeriodName": "2007", "@DateBegin": "2007-01...
3 16768 ... {"@PeriodName": "2006", "@DateBegin": "2006-01...
4 16768 ... {"@PeriodName": "2005", "@DateBegin": "2005-01...
Мне нужно преобразовать столбец Period
, который теперь является столбцом строк, в столбец значений json
. Обычно я делаю это с df.apply(lambda x: json.loads(x))
, но этот датафрейм слишком велик, чтобы обработать его целиком. Я хочу использовать dask
, но, кажется, упускаю что-то важное. Кажется, я не понимаю, как использовать apply
в dask
, но не могу найти решение.
Коды
Вот как я должен это сделать, если использую Pandas со всеми df в памяти:
#%% read df
os.chdir('/opt/data/.../download finance/output')
fulldb_accrep_united = pd.read_csv('fulldb_accrep_first_download_raw_quotes_corrected.csv', index_col = 0, encoding = 'utf-8')
os.chdir('..')
#%% Deleting some freaky symbols from column
condition = fulldb_accrep_united['Period'].str.contains('\\xa0', na = False, regex = False)
fulldb_accrep_united.loc[condition.values, 'Period'] = fulldb_accrep_united.loc[condition.values, 'Period'].str.replace('\\xa0', ' ', regex = False).values
#%% Convert to json
fulldb_accrep_united.loc[fulldb_accrep_united['Period'].notnull(), 'Period'] = fulldb_accrep_united['Period'].dropna().apply(lambda x: json.loads(x))
Это код, в котором я пытаюсь использовать dask
:
#%% load data with dask
os.chdir('/opt/data/.../download finance/output')
fulldb_accrep_united = dd.read_csv('fulldb_accrep_first_download_raw_quotes_corrected.csv', encoding = 'utf-8', blocksize = 16 * 1024 * 1024) #16Mb chunks
os.chdir('..')
#%% setup calculation graph. No work is done here.
def transform_to_json(df):
condition = df['Period'].str.contains('\\xa0', na = False, regex = False)
df['Period'] = df['Period'].mask(condition.values, df['Period'][condition.values].str.replace('\\xa0', ' ', regex = False).values)
condition2 = df['Period'].notnull()
df['Period'] = df['Period'].mask(condition2.values, df['Period'].dropna().apply(lambda x: json.loads(x)).values)
result = transform_to_json(fulldb_accrep_united)
Последняя ячейка здесь дает ошибку:
NotImplementedError: Series getitem in only supported for other series objects with matching partition structure
Что я делаю неправильно? Я пытался найти похожие темы почти 5 часов, но я думаю, что упускаю что-то важное, потому что я новичок в этой теме.