[pandas] Pandas DataFrame apply ()가 모든 코어를 사용하게 하시겠습니까?

2017 년 8 월 현재 Pandas DataFame.apply () 는 여전히 단일 코어 작업으로 제한되어 있습니다. 즉, 멀티 코어 머신은 df.apply(myfunc, axis=1).

모든 코어를 사용하여 데이터 프레임에서 병렬로 적용하는 방법은 무엇입니까?



답변

다음 swifter패키지를 사용할 수 있습니다 .

pip install swifter

pandas 용 플러그인으로 작동하여 apply함수 를 재사용 할 수 있습니다 .

import swifter

def some_function(data):
    return data * 10

data['out'] = data['in'].swifter.apply(some_function)

벡터화 여부 (위의 예에서와 같이) 여부에 관계없이 함수를 병렬화하는 가장 효율적인 방법을 자동으로 알아냅니다.

GitHub에서 더 많은 예제성능 비교 를 확인할 수 있습니다. 패키지는 현재 개발 중이므로 API가 변경 될 수 있습니다.

또한 이것은 문자열 열에 대해 자동으로 작동하지 않습니다 . 문자열을 사용할 때 Swifter는 apply병렬이 아닌 “단순한”Pandas로 대체됩니다 . 이 경우 강제로 사용하더라도 dask성능이 향상되지는 않으며 데이터 세트를 수동으로 분할하고 .NET을 사용하여 병렬화하는 것이multiprocessing 좋습니다.


답변

가장 간단한 방법은 Dask의 map_partitions 를 사용하는 입니다. 다음 가져 오기가 필요합니다 ( pip install dask).

import pandas as pd
import dask.dataframe as dd
from dask.multiprocessing import get

구문은

data = <your_pandas_dataframe>
ddata = dd.from_pandas(data, npartitions=30)

def myfunc(x,y,z, ...): return <whatever>

res = ddata.map_partitions(lambda df: df.apply((lambda row: myfunc(*row)), axis=1)).compute(get=get)

(코어가 16 개인 경우 30이 적절한 파티션 수라고 생각합니다.) 완전성을 위해 내 컴퓨터 (16 코어)의 차이를 측정했습니다.

data = pd.DataFrame()
data['col1'] = np.random.normal(size = 1500000)
data['col2'] = np.random.normal(size = 1500000)

ddata = dd.from_pandas(data, npartitions=30)
def myfunc(x,y): return y*(x**2+1)
def apply_myfunc_to_DF(df): return df.apply((lambda row: myfunc(*row)), axis=1)
def pandas_apply(): return apply_myfunc_to_DF(data)
def dask_apply(): return ddata.map_partitions(apply_myfunc_to_DF).compute(get=get)
def vectorized(): return myfunc(data['col1'], data['col2']  )

t_pds = timeit.Timer(lambda: pandas_apply())
print(t_pds.timeit(number=1))

28.16970546543598

t_dsk = timeit.Timer(lambda: dask_apply())
print(t_dsk.timeit(number=1))

2.708152851089835

t_vec = timeit.Timer(lambda: vectorized())
print(t_vec.timeit(number=1))

0.010668013244867325

주기 (10)의 단축의 요인을 판다에서 진행 파티션에 적용 DASK에 적용됩니다. 물론, 벡터화 할 수있는 함수가 있다면,이 경우 함수 ( y*(x**2+1))는 사소하게 벡터화되지만 벡터화가 불가능한 것들이 많이 있습니다.


답변

pandarallel대신 시도해 볼 수 있습니다 . 모든 CPU에서 Pandas 작업을 병렬화하는 간단하고 효율적인 도구 (Linux 및 macOS에서)

  • 병렬화에는 비용이 발생하므로 (새 프로세스 인스턴스화, 공유 메모리를 통해 데이터 전송 등) 병렬화 할 계산량이 충분히 높은 경우에만 병렬화가 효율적입니다. 아주 적은 양의 데이터에 대해 항상 그만한 가치가있는 것은 아닙니다.
  • 적용된 함수는 람다 함수가 아니어야합니다.
from pandarallel import pandarallel
from math import sin

pandarallel.initialize()

# FORBIDDEN
df.parallel_apply(lambda x: sin(x**2), axis=1)

# ALLOWED
def func(x):
    return sin(x**2)

df.parallel_apply(func, axis=1)

참조 https://github.com/nalepae/pandarallel를


답변

네이티브 파이썬에 머물고 싶다면 :

import multiprocessing as mp

with mp.Pool(mp.cpu_count()) as pool:
    df['newcol'] = pool.map(f, df['col'])

데이터 프레임의 f열에 병렬 방식으로 기능 을 적용합니다 .coldf


답변

다음은 pandas 적용이 병렬화 된 sklearn 기본 변환기의 예입니다.

import multiprocessing as mp
from sklearn.base import TransformerMixin, BaseEstimator

class ParllelTransformer(BaseEstimator, TransformerMixin):
    def __init__(self,
                 n_jobs=1):
        """
        n_jobs - parallel jobs to run
        """
        self.variety = variety
        self.user_abbrevs = user_abbrevs
        self.n_jobs = n_jobs
    def fit(self, X, y=None):
        return self
    def transform(self, X, *_):
        X_copy = X.copy()
        cores = mp.cpu_count()
        partitions = 1

        if self.n_jobs <= -1:
            partitions = cores
        elif self.n_jobs <= 0:
            partitions = 1
        else:
            partitions = min(self.n_jobs, cores)

        if partitions == 1:
            # transform sequentially
            return X_copy.apply(self._transform_one)

        # splitting data into batches
        data_split = np.array_split(X_copy, partitions)

        pool = mp.Pool(cores)

        # Here reduce function - concationation of transformed batches
        data = pd.concat(
            pool.map(self._preprocess_part, data_split)
        )

        pool.close()
        pool.join()
        return data
    def _transform_part(self, df_part):
        return df_part.apply(self._transform_one)
    def _transform_one(self, line):
        # some kind of transformations here
        return line

자세한 내용은 https://towardsdatascience.com/4-easy-steps-to-improve-your-machine-learning-code-performance-88a0b0eeffa8을 참조 하십시오.


답변

모든 (물리적 또는 논리적) 코어를 사용하려면 및 mapply의 대안으로 시도 할 수 있습니다.swifterpandarallel

초기화시 코어 수 (및 청킹 동작)를 설정할 수 있습니다.

import pandas as pd
import mapply

mapply.init(n_workers=-1)

...

df.mapply(myfunc, axis=1)

기본적으로 ( n_workers=-1) 패키지는 시스템에서 사용 가능한 모든 물리적 CPU를 사용합니다. 시스템에서 하이퍼 스레딩을 사용하는 경우 (일반적으로 물리적 CPU 양의 두 배가 표시됨)은 ( mapply는 ) 시스템의 다른 프로세스보다 다중 처리 풀의 우선 순위를 지정하는 추가 작업자를 생성합니다.

의 정의에 따라 all your cores모든 논리 코어를 대신 사용할 수도 있습니다 (이와 같이 CPU 바운드 프로세스가 물리적 CPU를 놓고 싸우게되므로 작업 속도가 느려질 수 있음).

import multiprocessing
n_workers = multiprocessing.cpu_count()

# or more explicit
import psutil
n_workers = psutil.cpu_count(logical=True)


답변