[python] 팬더를 사용한“대형 데이터”작업 흐름

팬더를 배우면서 수개월 동안이 질문에 대한 답을 찾기 위해 노력했습니다. 나는 일상적인 업무에 SAS를 사용하며 핵심 지원에 적합합니다. 그러나 SAS는 여러 가지 이유로 소프트웨어의 일부로 끔찍합니다.

언젠가 SAS 사용을 파이썬과 팬더로 바꾸고 싶지만 현재는 대규모 데이터 세트에 대한 핵심 작업 흐름이 부족합니다. 분산 네트워크를 필요로하는 “빅 데이터”에 대해 말하는 것이 아니라 메모리에 맞추기에는 너무 크지 만 하드 드라이브에는 맞지 않는 작은 파일입니다.

내 첫 번째 생각은 HDFStore디스크에 큰 데이터 세트를 보유하고 분석을 위해 필요한 부분 만 데이터 프레임으로 가져 오는 데 사용 하는 것입니다. 다른 사람들은 MongoDB를 사용하기 쉬운 대안으로 언급했습니다. 내 질문은 이것입니다 :

다음을 수행하기위한 모범 사례 워크 플로우는 무엇입니까?

  1. 플랫 파일을 영구적 인 온 디스크 데이터베이스 구조로로드
  2. 팬더 데이터 구조에 피드 할 데이터를 검색하기 위해 해당 데이터베이스 조회
  3. 팬더에서 조각을 조작 한 후 데이터베이스 업데이트

실제 사례는 특히 “대형 데이터”에서 팬더를 사용하는 모든 사람으로부터 높이 평가 될 것입니다.

편집-이것이 어떻게 작동하는지의 예 :

  1. 큰 플랫 파일을 반복적으로 가져 와서 디스크상의 영구 데이터베이스 구조에 저장하십시오. 이러한 파일은 일반적으로 너무 커서 메모리에 맞지 않습니다.
  2. 팬더를 사용하기 위해 메모리에 맞는이 데이터의 하위 집합 (일반적으로 한 번에 몇 열)을 읽으려고합니다.
  3. 선택한 열에서 다양한 작업을 수행하여 새 열을 만듭니다.
  4. 그런 다음 이러한 새 열을 데이터베이스 구조에 추가해야합니다.

이 단계를 수행하는 가장 좋은 방법을 찾으려고합니다. 팬더 및 파이 테이블에 대한 링크를 읽으면 새 열을 추가하는 것이 문제가 될 수 있습니다.

편집-Jeff의 질문에 구체적으로 응답 :

  1. 소비자 신용 리스크 모델을 구축 중입니다. 데이터의 종류에는 전화, SSN 및 주소 특성이 있습니다. 특성 값; 범죄 기록, 파산 등과 같은 경멸적인 정보 … 매일 사용하는 데이터 집합에는 평균 및 혼합 데이터 유형 (숫자 및 문자 데이터의 연속, 명목 및 순서 변수)에 대해 거의 1,000 ~ 2,000 개의 필드가 있습니다. 행을 거의 추가하지 않지만 새 열을 만드는 많은 작업을 수행합니다.
  2. 일반적인 작업에는 조건부 논리를 사용하여 여러 열을 새로운 복합 열로 결합하는 작업이 포함됩니다. 예를 들면 다음과 같습니다 if var1 > 2 then newvar = 'A' elif var2 = 4 then newvar = 'B'. 이러한 작업의 결과는 내 데이터 세트의 모든 레코드에 대한 새로운 열입니다.
  3. 마지막으로, 이러한 새로운 열을 디스크상의 데이터 구조에 추가하고 싶습니다. 2 단계를 반복하면서 크로스 탭과 설명 통계로 데이터를 탐색하여 흥미롭고 직관적 인 모델 관계를 찾으려고합니다.
  4. 일반적인 프로젝트 파일은 일반적으로 약 1GB입니다. 파일은 행이 소비자 데이터 레코드로 구성되는 방식으로 구성됩니다. 각 행에는 모든 레코드에 대해 동일한 수의 열이 있습니다. 항상 그렇습니다.
  5. 새 열을 만들 때 행별로 하위 집합을 만드는 것은 매우 드 rare니다. 그러나 보고서를 만들거나 설명 통계를 생성 할 때 행의 하위 집합을 만드는 것이 일반적입니다. 예를 들어 소매 신용 카드와 같이 특정 비즈니스 라인에 대해 간단한 빈도를 만들고 싶을 수 있습니다. 이를 위해보고하려는 열과 함께 비즈니스 라인 = 소매점 인 레코드 만 선택합니다. 그러나 새 열을 만들 때는 모든 데이터 행과 작업에 필요한 열만 가져옵니다.
  6. 모델링 프로세스에서는 모든 열을 분석하고 일부 결과 변수와 흥미로운 관계를 찾고 이러한 관계를 설명하는 새로운 복합 열을 만들어야합니다. 내가 탐색하는 열은 일반적으로 작은 세트로 수행됩니다. 예를 들어, 속성 값을 다루는 20 개의 열 집합에 중점을두고 대출의 채무 불이행과 어떻게 관련되는지 살펴 보겠습니다. 일단 그것들이 탐구되고 새로운 칼럼이 만들어지면 나는 다른 칼럼 그룹으로 이동하여 대학 교육을 말하고 과정을 반복합니다. 내가하고있는 일은 내 데이터와 일부 결과 간의 관계를 설명하는 후보 변수를 만드는 것입니다. 이 과정의 마지막 부분에서, 나는 복합 칼럼에서 방정식을 만드는 학습 기술을 적용합니다.

데이터 집합에 행을 추가하는 경우는 거의 없습니다. 나는 거의 항상 새로운 열 (통계 / 기계 학습 용어의 변수 또는 기능)을 만들 것입니다.



답변

나는 이런 방식으로 일상적으로 수십 기가 바이트의 데이터를 사용합니다. 예를 들어, 디스크를 통해 쿼리를 통해 읽고, 데이터를 만들고, 다시 추가하는 테이블이 있습니다.

그것의 가치가 읽기 워드 프로세서후반이 스레드에 데이터를 저장하는 방법에 대한 몇 가지 제안.

데이터 저장 방법에 영향을 미치는 세부 사항은 다음과 같습니다.
최대한 상세하게 설명하십시오. 구조 개발을 도와 드릴 수 있습니다.

  1. 데이터 크기, 행 수, 열, 열 유형; 행을 추가합니까, 아니면 열만 추가합니까?
  2. 일반적인 작업은 어떻게됩니까? 예를 들어 열에 대한 쿼리를 수행하여 여러 행과 특정 열을 선택한 다음 작업 (메모리 내)을 수행하고 새 열을 만들고 저장하십시오.
    장난감 예제를 제공하면 더 구체적인 권장 사항을 제공 할 수 있습니다.
  3. 그 처리 후, 당신은 무엇을합니까? 2 단계는 특별합니까, 아니면 반복 가능합니까?
  4. 플랫 파일 입력 : Gb의 총 크기는 몇 개입니까? 예를 들어 레코드별로 어떻게 구성되어 있습니까? 각 필드에 다른 필드가 포함되어 있습니까, 아니면 각 파일에 모든 필드가있는 파일 당 레코드가 있습니까?
  5. 기준에 따라 행의 하위 세트 (레코드)를 선택 했습니까 (예 : 필드 A> 5 인 행 선택)? 그런 다음 무언가를 수행하거나 모든 레코드가있는 필드 A, B, C를 선택하고 무언가를 수행합니까?
  6. 모든 열 (그룹)을 ‘작업’하거나 보고서에만 사용할 수있는 적절한 비율이 있습니까 (예 : 데이터를 유지하고 싶지만 해당 열을 명시 적으로 가져올 필요는 없습니다. 최종 결과 시간)?

해결책

팬더가 적어도0.10.1 설치되어 있는지 확인하십시오 .

청크 단위의 반복 파일여러 테이블 쿼리를 읽습니다 .

pytables는 쿼리 방식으로 행 단위로 작동하도록 최적화되었으므로 각 필드 그룹에 대한 테이블을 만듭니다. 이렇게하면 작은 테이블 그룹을 쉽게 선택할 수 있습니다 (큰 테이블에서 작동하지만이 방법으로 더 효율적입니다 … 앞으로이 제한을 해결할 수 있다고 생각합니다 … 더 직관적 임) :
(다음은 의사 코드입니다.)

import numpy as np
import pandas as pd

# create a store
store = pd.HDFStore('mystore.h5')

# this is the key to your storage:
#    this maps your fields to a specific group, and defines 
#    what you want to have as data_columns.
#    you might want to create a nice class wrapping this
#    (as you will want to have this map and its inversion)  
group_map = dict(
    A = dict(fields = ['field_1','field_2',.....], dc = ['field_1',....,'field_5']),
    B = dict(fields = ['field_10',......        ], dc = ['field_10']),
    .....
    REPORTING_ONLY = dict(fields = ['field_1000','field_1001',...], dc = []),

)

group_map_inverted = dict()
for g, v in group_map.items():
    group_map_inverted.update(dict([ (f,g) for f in v['fields'] ]))

파일 읽기 및 스토리지 생성 (본질적으로 수행 append_to_multiple) :

for f in files:
   # read in the file, additional options may be necessary here
   # the chunksize is not strictly necessary, you may be able to slurp each 
   # file into memory in which case just eliminate this part of the loop 
   # (you can also change chunksize if necessary)
   for chunk in pd.read_table(f, chunksize=50000):
       # we are going to append to each table by group
       # we are not going to create indexes at this time
       # but we *ARE* going to create (some) data_columns

       # figure out the field groupings
       for g, v in group_map.items():
             # create the frame for this group
             frame = chunk.reindex(columns = v['fields'], copy = False)    

             # append it
             store.append(g, frame, index=False, data_columns = v['dc'])

이제 파일에 모든 테이블이 있습니다 (실제로 원하는 경우 별도의 파일로 저장할 수 있습니다. 파일 이름을 group_map에 추가해야 할 수도 있지만 필요하지는 않습니다).

이것은 열을 가져오고 새로운 열을 만드는 방법입니다.

frame = store.select(group_that_I_want)
# you can optionally specify:
# columns = a list of the columns IN THAT GROUP (if you wanted to
#     select only say 3 out of the 20 columns in this sub-table)
# and a where clause if you want a subset of the rows

# do calculations on this frame
new_frame = cool_function_on_frame(frame)

# to 'add columns', create a new group (you probably want to
# limit the columns in this new_group to be only NEW ones
# (e.g. so you don't overlap from the other tables)
# add this info to the group_map
store.append(new_group, new_frame.reindex(columns = new_columns_created, copy = False), data_columns = new_columns_created)

사후 처리 준비가되면 :

# This may be a bit tricky; and depends what you are actually doing.
# I may need to modify this function to be a bit more general:
report_data = store.select_as_multiple([groups_1,groups_2,.....], where =['field_1>0', 'field_1000=foo'], selector = group_1)

data_columns 정보, 당신은 실제로 정의 할 필요가 없습니다 모든 data_columns을; 열을 기준으로 행을 하위 선택할 수 있습니다. 예를 들면 다음과 같습니다.

store.select(group, where = ['field_1000=foo', 'field_1001>0'])

최종 보고서 생성 단계에서 가장 흥미로울 수 있습니다 (실제로 데이터 열은 다른 열과 분리되어 있으므로 많이 정의하면 효율성에 다소 영향을 줄 수 있습니다).

다음을 원할 수도 있습니다.

  • 필드 목록을 가져오고 groups_map에서 그룹을 찾은 다음 결과를 연결하여 결과 프레임을 얻는 함수를 만듭니다 (이것은 본질적으로 select_as_multiple의 기능입니다). 이렇게하면 구조가 매우 투명 해집니다.
  • 특정 데이터 열에 대한 인덱스 (행 하위 설정이 훨씬 빨라짐)
  • 압축을 활성화하십시오.

궁금한 점이 있으면 알려주세요!


답변

위의 답변에 내가 찾은 간단한 접근 방식이 누락되었다고 생각합니다.

메모리에로드하기에 너무 큰 파일이 있으면 파일을 여러 개의 작은 파일 (행 또는 열)로 나눕니다.

예 : ~ 30GB 크기의 30 일 분량의 거래 데이터의 경우, 매일 ~ 1GB 크기의 파일로 분할합니다. 나중에 각 파일을 개별적으로 처리하고 마지막에 결과를 집계합니다.

가장 큰 장점 중 하나는 파일 (여러 스레드 또는 프로세스)의 병렬 처리가 가능하다는 것입니다

다른 장점은 파일 조작 (예 : 날짜 추가 / 제거 등)을 일반 쉘 명령으로 수행 할 수 있다는 것입니다. 고급 / 복잡한 파일 형식에서는 불가능합니다.

이 방법은 모든 시나리오를 다루지는 않지만 많은 시나리오에서 매우 유용합니다.


답변

이제 2 년이 지난 지금 ‘핵심’팬더에 해당하는 dask가 있습니다. 훌륭합니다! 모든 팬더 기능을 지원하지는 않지만 실제로 팬더 기능을 사용할 수 있습니다.


답변

데이터 세트가 1GB에서 20GB 사이이면 RAM이 48GB 인 워크 스테이션을 확보해야합니다. 그러면 Pandas는 전체 데이터 세트를 RAM에 보관할 수 있습니다. 나는 당신이 찾고있는 대답이 아니라는 것을 알고 있지만 4GB의 RAM이있는 노트북에서 과학 컴퓨팅을하는 것은 합리적이지 않습니다.


답변

나는 이것이 오래된 스레드라는 것을 알고 있지만 Blaze 라이브러리를 체크 아웃 할 가치가 있다고 생각합니다 . 이러한 유형의 상황을 위해 만들어졌습니다.

문서에서 :

Blaze는 NumPy와 Pandas의 유용성을 분산 및 코어 외부 컴퓨팅으로 확장합니다. Blaze는 NumPy ND-Array 또는 Pandas DataFrame과 유사한 인터페이스를 제공하지만 Postgres 또는 Spark와 같은 다양한 계산 엔진에 이러한 친숙한 인터페이스를 매핑합니다.

편집 : 그건 그렇고, ContinuumIO와 NumPy의 저자 인 Travis Oliphant가 지원합니다.


답변

이것은 pymongo의 경우입니다. 파이썬에서 SQL Server, sqlite, HDF, ORM (SQLAlchemy)을 사용하여 프로토 타입을 만들었습니다. 가장 먼저 pymongo는 문서 기반 DB이므로 각 사람은 문서 ( dict속성)입니다. 많은 사람들이 컬렉션을 형성하고 많은 컬렉션 (사람, 주식 시장, 수입)을 가질 수 있습니다.

pd.dateframe-> pymongo 참고 : chunksizein read_csv을 사용하여 5-10k 레코드로 유지합니다 (더 큰 경우 pymongo가 소켓을 떨어 뜨립니다)

aCollection.insert((a[1].to_dict() for a in df.iterrows()))

쿼리 : gt =보다 큼 …

pd.DataFrame(list(mongoCollection.find({'anAttribute':{'$gt':2887000, '$lt':2889000}})))

.find()반복자를 반환하므로 일반적 ichunked으로 작은 반복자를자를 때 사용 합니다.

일반적으로 10 개의 데이터 소스를 함께 붙여 넣기 때문에 조인은 어떻습니까?

aJoinDF = pandas.DataFrame(list(mongoCollection.find({'anAttribute':{'$in':Att_Keys}})))

그런 다음 (내 경우에는 때때로 aJoinDF“병합 가능”하기 전에 먼저 agg해야합니다 .)

df = pandas.merge(df, aJoinDF, on=aKey, how='left')

그런 다음 아래 업데이트 방법을 통해 새 정보를 기본 컬렉션에 쓸 수 있습니다. (논리 수집 대 물리적 데이터 소스).

collection.update({primarykey:foo},{key:change})

더 작은 조회에서는 비정규 화하면됩니다. 예를 들어, 문서에 코드가 있고 필드 코드 텍스트를 추가하고 dict문서를 작성할 때 찾아 보기 만하면 됩니다.

이제 사람을 중심으로 멋진 데이터 세트가 생겼으며 각 사례에 대한 논리를 발휘하고 더 많은 속성을 만들 수 있습니다. 마지막으로 팬더 3에서 메모리 최대 키 표시기를 읽고 피벗 / agg / 데이터 탐색을 수행 할 수 있습니다. 이것은 숫자 / 큰 텍스트 / 카테고리 / 코드 / 플로트 / 3 백만 레코드에 대해 나를 위해 작동합니다 …

MongoDB에 내장 된 두 가지 방법 (MapReduce 및 집계 프레임 워크)을 사용할 수도 있습니다. MapReduce보다 쉬우 며 빠른 집계 작업에 편리하게 보이는 집계 프레임 워크에 대한 자세한 내용은 여기를 참조하십시오 . 필드 나 관계를 정의 할 필요가 없으며 문서에 항목을 추가 할 수 있습니다. 빠르게 변화하는 numpy, pandas, python 툴셋의 현재 상태에서 MongoDB는 내가 일하는 데 도움이됩니다. 🙂


답변

나는 이것을 늦게 발견했지만 비슷한 문제 (모기지 선금 모델)로 일합니다. 내 해결책은 pandas HDFStore 레이어를 건너 뛰고 똑바로 pytables를 사용하는 것입니다. 최종 파일에서 각 열을 개별 HDF5 배열로 저장합니다.

기본 워크 플로는 먼저 데이터베이스에서 CSV 파일을 가져 오는 것입니다. 나는 그것을 압축하여 그렇게 크지 않다. 그런 다음 파이썬에서 반복하고 각 행을 실제 데이터 형식으로 변환 한 다음 HDF5 파일에 작성하여 행 지향 HDF5 파일로 변환합니다. 수십 분이 걸리지 만 행 단위로만 작동하기 때문에 메모리를 사용하지 않습니다. 그런 다음 행 지향 HDF5 파일을 열 지향 HDF5 파일로 “전치”합니다.

테이블 조옮김은 다음과 같습니다.

def transpose_table(h_in, table_path, h_out, group_name="data", group_path="/"):
    # Get a reference to the input data.
    tb = h_in.getNode(table_path)
    # Create the output group to hold the columns.
    grp = h_out.createGroup(group_path, group_name, filters=tables.Filters(complevel=1))
    for col_name in tb.colnames:
        logger.debug("Processing %s", col_name)
        # Get the data.
        col_data = tb.col(col_name)
        # Create the output array.
        arr = h_out.createCArray(grp,
                                 col_name,
                                 tables.Atom.from_dtype(col_data.dtype),
                                 col_data.shape)
        # Store the data.
        arr[:] = col_data
    h_out.flush()

다시 읽은 다음과 같습니다.

def read_hdf5(hdf5_path, group_path="/data", columns=None):
    """Read a transposed data set from a HDF5 file."""
    if isinstance(hdf5_path, tables.file.File):
        hf = hdf5_path
    else:
        hf = tables.openFile(hdf5_path)

    grp = hf.getNode(group_path)
    if columns is None:
        data = [(child.name, child[:]) for child in grp]
    else:
        data = [(child.name, child[:]) for child in grp if child.name in columns]

    # Convert any float32 columns to float64 for processing.
    for i in range(len(data)):
        name, vec = data[i]
        if vec.dtype == np.float32:
            data[i] = (name, vec.astype(np.float64))

    if not isinstance(hdf5_path, tables.file.File):
        hf.close()
    return pd.DataFrame.from_items(data)

이제는 일반적으로 메모리가 많은 컴퓨터에서이를 실행하므로 메모리 사용에 충분히주의하지 않을 수 있습니다. 예를 들어, 기본적으로로드 조작은 전체 데이터 세트를 읽습니다.

이것은 일반적으로 나를 위해 작동하지만 약간 어색하고 멋진 pytables 마술을 사용할 수 없습니다.

편집 : 레코드 배열 pytables 기본값에 비해이 방법의 실제 장점은 테이블을 처리 할 수없는 h5r을 사용하여 R에 데이터를로드 할 수 있다는 것입니다. 또는 적어도 이기종 테이블을로드 할 수 없었습니다.