나는 1,500 만 개가 넘는 행에서 데이터 프레임에 대해 팬더 작업을 정기적으로 수행하며 특정 작업에 대한 진행률 표시기에 액세스하고 싶습니다.
팬더 분할 적용 조합 작업에 대한 텍스트 기반 진행률 표시기가 있습니까?
예를 들면 다음과 같습니다.
df_users.groupby(['userID', 'requestDate']).apply(feature_rollup)
여기서 feature_rollup
많은 DF 열을 사용하고 다양한 방법을 통해 새로운 사용자 열을 생성하는 다소 관련된 함수가 있습니다. 이 작업은 큰 데이터 프레임의 경우 시간이 걸릴 수 있으므로 진행 상황을 업데이트하는 iPython 노트북에 텍스트 기반 출력이 가능한지 알고 싶습니다.
지금까지 파이썬에 대한 표준 루프 진행률 표시기를 시도했지만 의미있는 방식으로 팬더와 상호 작용하지 않습니다.
나는 판다 라이브러리 / 문서에서 간과 된 결합 조합의 진행 상황을 알 수있는 것을 간절히 바라고 있습니다. 간단한 구현은 apply
함수가 작동 하는 총 데이터 프레임 서브 세트 수를 보고 해당 서브 세트의 완료된 부분으로 진행률을보고합니다.
라이브러리에 추가해야 할 것입니까?
답변
대중적인 수요로 인해에 대한 tqdm
지원이 추가되었습니다 pandas
. 다른 답변과 달리 팬더 속도가 눈에 띄게 느려지지는 않습니다 . 예를 들면 다음과 DataFrameGroupBy.progress_apply
같습니다.
import pandas as pd
import numpy as np
from tqdm import tqdm
# from tqdm.auto import tqdm # for notebooks
df = pd.DataFrame(np.random.randint(0, int(1e8), (10000, 1000)))
# Create and register a new `tqdm` instance with `pandas`
# (can use tqdm_gui, optional kwargs, etc.)
tqdm.pandas()
# Now you can use `progress_apply` instead of `apply`
df.groupby(0).progress_apply(lambda x: x**2)
이것이 어떻게 작동하는지 (그리고 자신의 콜백을 위해 수정하는 방법)에 관심이 있다면 github 의 예제 , pypi에 대한 전체 문서를 참조 하거나 모듈을 가져 와서 실행하십시오 help(tqdm)
.
편집하다
원래 질문에 직접 대답하려면 다음을 바꾸십시오.
df_users.groupby(['userID', 'requestDate']).apply(feature_rollup)
와:
from tqdm import tqdm
tqdm.pandas()
df_users.groupby(['userID', 'requestDate']).progress_apply(feature_rollup)
참고 : tqdm <= v4.8 : tqdm 버전이 4.8 미만인 경우 다음 tqdm.pandas()
을 수행해야합니다.
from tqdm import tqdm, tqdm_pandas
tqdm_pandas(tqdm())
답변
Jeff의 대답을 조정하고 (재사용 가능한 기능으로 사용).
def logged_apply(g, func, *args, **kwargs):
step_percentage = 100. / len(g)
import sys
sys.stdout.write('apply progress: 0%')
sys.stdout.flush()
def logging_decorator(func):
def wrapper(*args, **kwargs):
progress = wrapper.count * step_percentage
sys.stdout.write('\033[D \033[D' * 4 + format(progress, '3.0f') + '%')
sys.stdout.flush()
wrapper.count += 1
return func(*args, **kwargs)
wrapper.count = 0
return wrapper
logged_func = logging_decorator(func)
res = g.apply(logged_func, *args, **kwargs)
sys.stdout.write('\033[D \033[D' * 4 + format(100., '3.0f') + '%' + '\n')
sys.stdout.flush()
return res
참고 : 적용 진행률 업데이트는 인라인 입니다. 함수가 stdout이면 작동하지 않습니다.
In [11]: g = df_users.groupby(['userID', 'requestDate'])
In [12]: f = feature_rollup
In [13]: logged_apply(g, f)
apply progress: 100%
Out[13]:
...
평소와 같이이를 방법으로 groupby 객체에 추가 할 수 있습니다.
from pandas.core.groupby import DataFrameGroupBy
DataFrameGroupBy.logged_apply = logged_apply
In [21]: g.logged_apply(f)
apply progress: 100%
Out[21]:
...
의견에서 언급했듯이 이것은 핵심 팬더가 구현에 관심이있는 기능이 아닙니다. 그러나 파이썬을 사용하면 많은 팬더 객체 / 메소드에 대해 이것을 만들 수 있습니다 (이 방법은 일반화 할 수는 있지만 상당히 많은 작업 일 것입니다).
답변
Jupyter / ipython 노트북에서이 기능을 사용하는 방법에 대한 지원이 필요한 경우 다음과 같이 유용한 기사 와 관련 기사를 제공합니다 .
from tqdm._tqdm_notebook import tqdm_notebook
import pandas as pd
tqdm_notebook.pandas()
df = pd.DataFrame(np.random.randint(0, int(1e8), (10000, 1000)))
df.groupby(0).progress_apply(lambda x: x**2)
에 대한 import 문에서 밑줄을 확인하십시오 _tqdm_notebook
. 참조 된 기사에서 언급했듯이 개발은 베타 단계에서 진행 중입니다.
답변
사용자 정의 병렬 팬더 적용 코드에 tqdm을 적용하려는 모든 사람에게 적합합니다.
(수년 동안 병렬화를 위해 라이브러리 중 일부를 시도했지만 주로 적용 기능에 대한 100 % 병렬화 솔루션을 찾지 못했으며 항상 “수동”코드로 돌아와야했습니다.
df_multi_core- 이것이 당신이 부르는 것입니다. 받아들입니다 :
- df 객체
- 호출하려는 함수 이름
- 함수를 수행 할 수있는 열의 하위 집합 (시간 / 메모리 감소에 도움이 됨)
- 병렬로 실행할 작업 수 (-1 또는 모든 코어에서 생략)
- df의 함수가 받아들이는 다른 kwargs (예 : “axis”)
_df_split- 실행중인 모듈 (Pool.map은 “배치에 따라 다름”)에 전역으로 배치해야하는 내부 도우미 함수입니다. 그렇지 않으면 내부적으로 찾습니다 ..
여기 내 요지 코드가 있습니다 (팬더 기능 테스트를 더 추가 할 것입니다).
import pandas as pd
import numpy as np
import multiprocessing
from functools import partial
def _df_split(tup_arg, **kwargs):
split_ind, df_split, df_f_name = tup_arg
return (split_ind, getattr(df_split, df_f_name)(**kwargs))
def df_multi_core(df, df_f_name, subset=None, njobs=-1, **kwargs):
if njobs == -1:
njobs = multiprocessing.cpu_count()
pool = multiprocessing.Pool(processes=njobs)
try:
splits = np.array_split(df[subset], njobs)
except ValueError:
splits = np.array_split(df, njobs)
pool_data = [(split_ind, df_split, df_f_name) for split_ind, df_split in enumerate(splits)]
results = pool.map(partial(_df_split, **kwargs), pool_data)
pool.close()
pool.join()
results = sorted(results, key=lambda x:x[0])
results = pd.concat([split[1] for split in results])
return results
Bellow는 tqdm “progress_apply” 를 사용한 병렬 적용 을 위한 테스트 코드입니다 .
from time import time
from tqdm import tqdm
tqdm.pandas()
if __name__ == '__main__':
sep = '-' * 50
# tqdm progress_apply test
def apply_f(row):
return row['c1'] + 0.1
N = 1000000
np.random.seed(0)
df = pd.DataFrame({'c1': np.arange(N), 'c2': np.arange(N)})
print('testing pandas apply on {}\n{}'.format(df.shape, sep))
t1 = time()
res = df.progress_apply(apply_f, axis=1)
t2 = time()
print('result random sample\n{}'.format(res.sample(n=3, random_state=0)))
print('time for native implementation {}\n{}'.format(round(t2 - t1, 2), sep))
t3 = time()
# res = df_multi_core(df=df, df_f_name='apply', subset=['c1'], njobs=-1, func=apply_f, axis=1)
res = df_multi_core(df=df, df_f_name='progress_apply', subset=['c1'], njobs=-1, func=apply_f, axis=1)
t4 = time()
print('result random sample\n{}'.format(res.sample(n=3, random_state=0)))
print('time for multi core implementation {}\n{}'.format(round(t4 - t3, 2), sep))
출력에서 병렬화없이 실행하기위한 진행률 표시 줄 1 개와 병렬화로 실행할 때 코어 당 진행률 표시 줄을 볼 수 있습니다. 약간의 hickup이 있고 때로는 나머지 코어가 한 번에 나타나지만 코어 당 진행 통계 (예 : 초당 / 총 레코드 수)를 얻었으므로 유용하다고 생각합니다.
이 훌륭한 도서관에 대해 @abcdaa에게 감사합니다!
답변
데코레이터로 쉽게 할 수 있습니다
from functools import wraps
def logging_decorator(func):
@wraps
def wrapper(*args, **kwargs):
wrapper.count += 1
print "The function I modify has been called {0} times(s).".format(
wrapper.count)
func(*args, **kwargs)
wrapper.count = 0
return wrapper
modified_function = logging_decorator(feature_rollup)
그런 다음 modified_function을 사용하십시오 (그리고 인쇄 할 때 변경하십시오)
답변
Jeff의 답변 을 총계를 포함하도록 변경 하여 진행률과 변수를 추적하여 모든 X 반복을 인쇄 할 수 있습니다 ( “print_at”가 상당히 높은 경우 실제로 성능을 크게 향상시킵니다)
def count_wrapper(func,total, print_at):
def wrapper(*args):
wrapper.count += 1
if wrapper.count % wrapper.print_at == 0:
clear_output()
sys.stdout.write( "%d / %d"%(calc_time.count,calc_time.total) )
sys.stdout.flush()
return func(*args)
wrapper.count = 0
wrapper.total = total
wrapper.print_at = print_at
return wrapper
clear_output () 함수는
from IPython.core.display import clear_output
IPython에 없다면 Andy Hayden의 대답은 그것을하지 않고 그렇게합니다.