매우 큰 그룹 DataFrame
(혼합 데이터 유형)에 함수를 적용해야하는 경우가 많으며 여러 코어를 활용하고 싶습니다.
그룹에서 반복자를 만들고 다중 처리 모듈을 사용할 수 있지만 모든 그룹과 함수의 결과를 프로세스 간의 메시징을 위해 선택해야하므로 효율적이지 않습니다.
산세를 피하거나 DataFrame
완전히 복사하는 것을 피할 수있는 방법이 있습니까? 다중 처리 모듈의 공유 메모리 기능이 numpy
배열 로 제한되어있는 것 같습니다 . 다른 옵션이 있습니까?
답변
위의 의견에서 이것은 pandas
당분간 계획된 것 같습니다 ( 방금 눈치 챈 흥미로운 rosetta
프로젝트 도 있습니다 ).
그러나 모든 병렬 기능이에 통합 될 때까지 + OpenMP 및 C ++ pandas
를 pandas
직접 사용하여 효율적이고 비 메모리 복사 병렬 증가를 작성하는 것이 매우 쉽다는 것을 알았습니다 .cython
다음은 병렬 groupby-sum을 작성하는 간단한 예입니다. 사용은 다음과 같습니다.
import pandas as pd
import para_group_demo
df = pd.DataFrame({'a': [1, 2, 1, 2, 1, 1, 0], 'b': range(7)})
print para_group_demo.sum(df.a, df.b)
출력은 다음과 같습니다.
sum
key
0 6
1 11
2 4
참고 의심 할 여지없이이 간단한 예제의 기능은 결국 pandas
. 그러나 어떤 것들은 당분간 C ++에서 병렬화하는 것이 더 자연 스러울 것이며, 이것을 .NET Framework로 결합하는 것이 얼마나 쉬운 지 아는 것이 중요합니다 pandas
.
이를 위해 코드가 따르는 간단한 단일 소스 파일 확장자를 작성했습니다.
일부 가져 오기 및 유형 정의로 시작합니다.
from libc.stdint cimport int64_t, uint64_t
from libcpp.vector cimport vector
from libcpp.unordered_map cimport unordered_map
cimport cython
from cython.operator cimport dereference as deref, preincrement as inc
from cython.parallel import prange
import pandas as pd
ctypedef unordered_map[int64_t, uint64_t] counts_t
ctypedef unordered_map[int64_t, uint64_t].iterator counts_it_t
ctypedef vector[counts_t] counts_vec_t
C ++ unordered_map
유형은 단일 스레드 vector
로 합산하기위한 것이고 는 모든 스레드에서 합산하기위한 것입니다.
이제 기능 sum
. 빠른 액세스를 위해 입력 된 메모리 뷰로 시작 합니다.
def sum(crit, vals):
cdef int64_t[:] crit_view = crit.values
cdef int64_t[:] vals_view = vals.values
함수는 스레드를 반동 등하게 나누고 (여기서는 4로 하드 코딩 됨) 각 스레드가 해당 범위의 항목을 합산하도록합니다.
cdef uint64_t num_threads = 4
cdef uint64_t l = len(crit)
cdef uint64_t s = l / num_threads + 1
cdef uint64_t i, j, e
cdef counts_vec_t counts
counts = counts_vec_t(num_threads)
counts.resize(num_threads)
with cython.boundscheck(False):
for i in prange(num_threads, nogil=True):
j = i * s
e = j + s
if e > l:
e = l
while j < e:
counts[i][crit_view[j]] += vals_view[j]
inc(j)
스레드가 완료되면 함수는 (다른 범위의) 모든 결과를 단일로 병합합니다 unordered_map
.
cdef counts_t total
cdef counts_it_t it, e_it
for i in range(num_threads):
it = counts[i].begin()
e_it = counts[i].end()
while it != e_it:
total[deref(it).first] += deref(it).second
inc(it)
남은 것은를 만들고 DataFrame
결과를 반환하는 것입니다.
key, sum_ = [], []
it = total.begin()
e_it = total.end()
while it != e_it:
key.append(deref(it).first)
sum_.append(deref(it).second)
inc(it)
df = pd.DataFrame({'key': key, 'sum': sum_})
df.set_index('key', inplace=True)
return df