[python] 그룹화 된 Pandas DataFrame에 함수를 병렬로 효율적으로 적용

매우 큰 그룹 DataFrame(혼합 데이터 유형)에 함수를 적용해야하는 경우가 많으며 여러 코어를 활용하고 싶습니다.

그룹에서 반복자를 만들고 다중 처리 모듈을 사용할 수 있지만 모든 그룹과 함수의 결과를 프로세스 간의 메시징을 위해 선택해야하므로 효율적이지 않습니다.

산세를 피하거나 DataFrame완전히 복사하는 것을 피할 수있는 방법이 있습니까? 다중 처리 모듈의 공유 메모리 기능이 numpy배열 로 제한되어있는 것 같습니다 . 다른 옵션이 있습니까?



답변

위의 의견에서 이것은 pandas당분간 계획된 것 같습니다 ( 방금 눈치 챈 흥미로운 rosetta프로젝트 도 있습니다 ).

그러나 모든 병렬 기능이에 통합 될 때까지 + OpenMP 및 C ++ pandaspandas직접 사용하여 효율적이고 비 메모리 복사 병렬 증가를 작성하는 것이 매우 쉽다는 것을 알았습니다 .cython

다음은 병렬 groupby-sum을 작성하는 간단한 예입니다. 사용은 다음과 같습니다.

import pandas as pd
import para_group_demo

df = pd.DataFrame({'a': [1, 2, 1, 2, 1, 1, 0], 'b': range(7)})
print para_group_demo.sum(df.a, df.b)

출력은 다음과 같습니다.

     sum
key
0      6
1      11
2      4

참고 의심 할 여지없이이 간단한 예제의 기능은 결국 pandas. 그러나 어떤 것들은 당분간 C ++에서 병렬화하는 것이 더 자연 스러울 것이며, 이것을 .NET Framework로 결합하는 것이 얼마나 쉬운 지 아는 것이 중요합니다 pandas.


이를 위해 코드가 따르는 간단한 단일 소스 파일 확장자를 작성했습니다.

일부 가져 오기 및 유형 정의로 시작합니다.

from libc.stdint cimport int64_t, uint64_t
from libcpp.vector cimport vector
from libcpp.unordered_map cimport unordered_map

cimport cython
from cython.operator cimport dereference as deref, preincrement as inc
from cython.parallel import prange

import pandas as pd

ctypedef unordered_map[int64_t, uint64_t] counts_t
ctypedef unordered_map[int64_t, uint64_t].iterator counts_it_t
ctypedef vector[counts_t] counts_vec_t

C ++ unordered_map유형은 단일 스레드 vector로 합산하기위한 것이고 는 모든 스레드에서 합산하기위한 것입니다.

이제 기능 sum. 빠른 액세스를 위해 입력 된 메모리 뷰로 시작 합니다.

def sum(crit, vals):
    cdef int64_t[:] crit_view = crit.values
    cdef int64_t[:] vals_view = vals.values

함수는 스레드를 반동 등하게 나누고 (여기서는 4로 하드 코딩 됨) 각 스레드가 해당 범위의 항목을 합산하도록합니다.

    cdef uint64_t num_threads = 4
    cdef uint64_t l = len(crit)
    cdef uint64_t s = l / num_threads + 1
    cdef uint64_t i, j, e
    cdef counts_vec_t counts
    counts = counts_vec_t(num_threads)
    counts.resize(num_threads)
    with cython.boundscheck(False):
        for i in prange(num_threads, nogil=True):
            j = i * s
            e = j + s
            if e > l:
                e = l
            while j < e:
                counts[i][crit_view[j]] += vals_view[j]
                inc(j)

스레드가 완료되면 함수는 (다른 범위의) 모든 결과를 단일로 병합합니다 unordered_map.

    cdef counts_t total
    cdef counts_it_t it, e_it
    for i in range(num_threads):
        it = counts[i].begin()
        e_it = counts[i].end()
        while it != e_it:
            total[deref(it).first] += deref(it).second
            inc(it)

남은 것은를 만들고 DataFrame결과를 반환하는 것입니다.

    key, sum_ = [], []
    it = total.begin()
    e_it = total.end()
    while it != e_it:
        key.append(deref(it).first)
        sum_.append(deref(it).second)
        inc(it)

    df = pd.DataFrame({'key': key, 'sum': sum_})
    df.set_index('key', inplace=True)
    return df


답변