[postgresql] PostgreSQL에서 UPSERT (MERGE, INSERT … ON DUPLICATE UPDATE)하는 방법은 무엇입니까?

여기서 자주 묻는 질문은 upsert를 수행하는 방법인데, 이는 MySQL 호출 INSERT ... ON DUPLICATE UPDATE과 표준이 MERGE작업의 일부로 지원하는 것입니다.

PostgreSQL이 직접 지원하지 않는 경우 (9.5 페이지 이전) 어떻게이 작업을 수행합니까? 다음을 고려하세요:

CREATE TABLE testtable (
    id integer PRIMARY KEY,
    somedata text NOT NULL
);

INSERT INTO testtable (id, somedata) VALUES
(1, 'fred'),
(2, 'bob');

이제 “upsert”튜플 원하는 상상 (2, 'Joe'), (3, 'Alan')새로운 테이블의 내용이 될 수 있도록 :

(1, 'fred'),
(2, 'Joe'),    -- Changed value of existing tuple
(3, 'Alan')    -- Added new tuple

사람들이에 대해 이야기 할 때 이야기하는 내용 upsert입니다. 결정적으로, 모든 테이블 접근 방식은 동일한 테이블에서 여러 트랜잭션이 존재하는 경우 명시 적 잠금을 사용하거나 결과적으로 발생하는 경쟁 조건을 방어함으로써 안전해야합니다.

이 주제는 PostgreSQL의 중복 업데이트에 대한 Insert에서 광범위하게 논의 됩니까? , 그러나 그것은 MySQL 구문의 대안에 관한 것이며 시간이 지남에 따라 관련이없는 세부 사항이 상당히 커졌습니다. 나는 확실한 답변을 위해 노력하고 있습니다.

이러한 기술은 “존재하지 않으면 삽입, 그렇지 않으면 아무것도하지 않음”, 즉 “중복 키 무시시 삽입 …”에도 유용합니다.



답변

9.5 이상 :

PostgreSQL 9.5 및 최신 지원 INSERT ... ON CONFLICT UPDATE(및 ON CONFLICT DO NOTHING), 즉 upsert.

와 비교ON DUPLICATE KEY UPDATE .

빠른 설명 .

사용을위한 참조 매뉴얼 구체적 – conflict_action의 신택스 다이어그램 절 및 해설 텍스트 .

아래에 제공된 9.4 및 이전 버전의 솔루션과 달리이 기능은 여러 개의 충돌하는 행에서 작동하며 독점 잠금 또는 재시도 루프가 필요하지 않습니다.

는 기능을 추가하는 것은 여기에 투입 하고 개발 중심으로 논의가 여기에있다 .


9.5에 있고 이전 버전과 호환되지 않아도되는 경우 지금 읽기를 중지 할 수 있습니다 .


9.4 이상 :

PostgreSQL에는 내장 UPSERT(또는 MERGE) 기능 이 없으므로 동시 사용시 효율적으로 수행하는 것은 매우 어렵습니다.

이 기사는 문제를 유용한 자세하게 설명 합니다.

일반적으로 두 가지 옵션 중에서 선택해야합니다.

  • 재시도 루프의 개별 삽입 / 업데이트 작업 또는
  • 테이블 잠금 및 일괄 병합

개별 행 재시도 루프

많은 연결이 동시에 삽입을 시도하려는 경우 재시도 루프에서 개별 행 업 서트를 사용하는 것이 적합한 옵션입니다.

PostgreSQL 문서에는 데이터베이스 내부 루프에서이를 수행 할 수있는 유용한 절차가 포함되어 있습니다 . 대부분의 순진한 솔루션과 달리 업데이트 손실 및 삽입 레이스를 방지합니다. 그것은 오직 READ COMMITTED모드 에서만 작동하며 그것이 당신이 거래에서하는 유일한 일이라면 안전합니다. 트리거 또는 보조 고유 키로 인해 고유 한 위반이 발생하면이 기능이 제대로 작동하지 않습니다.

이 전략은 매우 비효율적입니다. 실용적 일 때마다 작업을 대기시키고 대신 아래 설명 된대로 대량 업 사트를 수행해야합니다.

이 문제에 대한 많은 시도 된 솔루션은 롤백을 고려하지 않으므로 업데이트가 불완전합니다. 두 거래는 서로 경쟁합니다. 그들 중 하나는 성공적으로 INSERTs; 다른 하나는 중복 키 오류를 가져오고 UPDATE대신 수행합니다. UPDATE(가)에 대한 블록 기다리고 INSERT롤백 또는 커밋. 롤백 할 때 UPDATE조건 재확인은 0 행과 일치하므로 UPDATE커밋이 실제로 예상 한 upsert를 수행하지는 않았습니다. 결과 행 수를 확인하고 필요한 경우 다시 시도해야합니다.

일부 시도 된 솔루션은 SELECT 레이스도 고려하지 않습니다. 당신이 명백하고 간단한 것을 시도한다면 :

-- THIS IS WRONG. DO NOT COPY IT. It's an EXAMPLE.

BEGIN;

UPDATE testtable
SET somedata = 'blah'
WHERE id = 2;

-- Remember, this is WRONG. Do NOT COPY IT.

INSERT INTO testtable (id, somedata)
SELECT 2, 'blah'
WHERE NOT EXISTS (SELECT 1 FROM testtable WHERE testtable.id = 2);

COMMIT;

두 번에 두 번 실행되면 몇 가지 실패 모드가 있습니다. 하나는 업데이트 재확인과 관련하여 이미 논의 된 문제입니다. 다른 하나는 UPDATE동시에 0 행과 일치하고 계속되는 곳입니다. 그리고 둘은 할 EXISTS일이 시험 전에INSERT. 둘 다 0 행을 얻으므로 둘 다 수행합니다 INSERT. 중복 키 오류로 실패합니다.

이것이 재시도 루프가 필요한 이유입니다. 현명한 SQL로 중복 키 오류 또는 업데이트 손실을 방지 할 수 있다고 생각할 수는 있지만 그렇게 할 수는 없습니다. 선택한 개수에 따라 행 개수를 확인하거나 중복 키 오류를 처리하고 다시 시도해야합니다.

이를 위해 자신의 솔루션을 굴리지 마십시오. 메시지 큐잉과 마찬가지로 잘못되었을 수 있습니다.

자물쇠를 가진 대량 upsert

때로는 기존의 기존 데이터 세트에 병합 할 새 데이터 세트가있는 대량 업 세트를 수행하려고합니다. 이는 개별 행 업소 트보다 훨씬 효율적이며 가능할 때마다 선호되어야합니다.

이 경우 일반적으로 다음 프로세스를 따릅니다.

  • CREATETEMPORARY테이블

  • COPY 또는 임시 테이블에 새 데이터를 대량 삽입

  • LOCK목표 테이블 IN EXCLUSIVE MODE. 이를 통해 다른 트랜잭션 SELECT은 테이블을 변경할 수 있지만 테이블을 변경할 수는 없습니다.

  • 수행 UPDATE ... FROM임시 테이블의 값을 사용하여 기존 기록을;

  • 수행 INSERT이미 대상 테이블에 존재하지 않는 행을;

  • COMMIT잠금을 해제합니다.

예를 들어, 질문에 주어진 예제의 경우 다중 값 INSERT을 사용 하여 임시 테이블을 채우십시오.

BEGIN;

CREATE TEMPORARY TABLE newvals(id integer, somedata text);

INSERT INTO newvals(id, somedata) VALUES (2, 'Joe'), (3, 'Alan');

LOCK TABLE testtable IN EXCLUSIVE MODE;

UPDATE testtable
SET somedata = newvals.somedata
FROM newvals
WHERE newvals.id = testtable.id;

INSERT INTO testtable
SELECT newvals.id, newvals.somedata
FROM newvals
LEFT OUTER JOIN testtable ON (testtable.id = newvals.id)
WHERE testtable.id IS NULL;

COMMIT;

관련 독서

무엇에 대해 MERGE?

SQL 표준은 MERGE실제로 동시성 시맨틱이 잘못 정의되어 있으므로 먼저 테이블을 잠그지 않으면 서 업 사이징에 적합하지 않습니다.

데이터 병합에 유용한 OLAP 문이지만 실제로 동시성 안전 upsert에 유용한 솔루션은 아닙니다. 다른 DBMS를 사용하여 upsert에 사용하는 사람들에게 많은 조언이 MERGE있지만 실제로 잘못되었습니다.

다른 DB :


답변

PostgreSQL 9.5 이전 버전의 단일 삽입 문제에 대한 다른 솔루션을 제공하려고합니다. 아이디어는 단순히 삽입을 먼저 시도하고 레코드가 이미있는 경우 업데이트를 시도하는 것입니다.

do $$
begin 
  insert into testtable(id, somedata) values(2,'Joe');
exception when unique_violation then
  update testtable set somedata = 'Joe' where id = 2;
end $$;

이 솔루션은 테이블의 행을 삭제하지 않은 경우에만 적용 할 수 있습니다 .

나는이 솔루션의 효율성에 대해 알지 못하지만 충분히 합리적 인 것 같습니다.


답변

다음은 insert ... on conflict ...( pg 9.5+ )에 대한 몇 가지 예입니다 .

  • 충돌시 아무것도 삽입 하지 마십시오 .
    insert into dummy(id, name, size) values(1, 'new_name', 3)
    on conflict do nothing;`  
    
  • 충돌 업데이트시 삽입을 통해 열을 통해 충돌 대상을 지정 합니다.
    insert into dummy(id, name, size) values(1, 'new_name', 3)
    on conflict(id)
    do update set name = 'new_name', size = 3;  
    
  • 충돌 업데이트시 삽입 은 제한 조건 이름을 통해 충돌 대상을 지정 합니다.
    insert into dummy(id, name, size) values(1, 'new_name', 3)
    on conflict on constraint dummy_pkey
    do update set name = 'new_name', size = 4;
    

답변

Postgres에 대한 SQLAlchemy upsert> = 9.5

위의 큰 게시물은 Postgres 버전에 대한 다양한 SQL 접근 방식 (문제와 같이 9.5가 아닌 것뿐만 아니라)을 다루므로 Postgres 9.5를 사용하는 경우 SQLAlchemy에서 수행하는 방법을 추가하고 싶습니다. 자체 upsert를 구현하는 대신 SQLAlchemy의 기능 (SQLAlchemy 1.1에 추가 된 기능)을 사용할 수도 있습니다. 개인적으로 가능하다면 이것들을 사용하는 것이 좋습니다. 편의성뿐만 아니라 PostgreSQL이 발생할 수있는 경쟁 조건을 처리 할 수 ​​있기 때문입니다.

어제 나에게 준 다른 답변에서 교차 게시 ( https://stackoverflow.com/a/44395983/2156909 )

SQLAlchemy의 지원 ON CONFLICT두 가지 방법으로 지금 on_conflict_do_update()on_conflict_do_nothing():

설명서에서 복사 :

from sqlalchemy.dialects.postgresql import insert

stmt = insert(my_table).values(user_email='a@b.com', data='inserted data')
stmt = stmt.on_conflict_do_update(
    index_elements=[my_table.c.user_email],
    index_where=my_table.c.user_email.like('%@gmail.com'),
    set_=dict(data=stmt.excluded.data)
    )
conn.execute(stmt)

http://docs.sqlalchemy.org/en/latest/dialects/postgresql.html?highlight=conflict#insert-on-conflict-upsert


답변

WITH UPD AS (UPDATE TEST_TABLE SET SOME_DATA = 'Joe' WHERE ID = 2 
RETURNING ID),
INS AS (SELECT '2', 'Joe' WHERE NOT EXISTS (SELECT * FROM UPD))
INSERT INTO TEST_TABLE(ID, SOME_DATA) SELECT * FROM INS

PostgreSQL 9.3에서 테스트


답변

이 질문 이 끝났 으므로 SQLAlchemy를 사용하여 어떻게 수행하는지 여기에 게시하고 있습니다. 재귀를 통해 경쟁 조건 및 유효성 검사 오류 에 맞서기 위해 대량 삽입 또는 업데이트를 재 시도 합니다.

먼저 수입품

import itertools as it

from functools import partial
from operator import itemgetter

from sqlalchemy.exc import IntegrityError
from app import session
from models import Posts

이제 몇 도우미 기능

def chunk(content, chunksize=None):
    """Groups data into chunks each with (at most) `chunksize` items.
    https://stackoverflow.com/a/22919323/408556
    """
    if chunksize:
        i = iter(content)
        generator = (list(it.islice(i, chunksize)) for _ in it.count())
    else:
        generator = iter([content])

    return it.takewhile(bool, generator)


def gen_resources(records):
    """Yields a dictionary if the record's id already exists, a row object
    otherwise.
    """
    ids = {item[0] for item in session.query(Posts.id)}

    for record in records:
        is_row = hasattr(record, 'to_dict')

        if is_row and record.id in ids:
            # It's a row but the id already exists, so we need to convert it 
            # to a dict that updates the existing record. Since it is duplicate,
            # also yield True
            yield record.to_dict(), True
        elif is_row:
            # It's a row and the id doesn't exist, so no conversion needed. 
            # Since it's not a duplicate, also yield False
            yield record, False
        elif record['id'] in ids:
            # It's a dict and the id already exists, so no conversion needed. 
            # Since it is duplicate, also yield True
            yield record, True
        else:
            # It's a dict and the id doesn't exist, so we need to convert it. 
            # Since it's not a duplicate, also yield False
            yield Posts(**record), False

마지막으로 upsert 함수

def upsert(data, chunksize=None):
    for records in chunk(data, chunksize):
        resources = gen_resources(records)
        sorted_resources = sorted(resources, key=itemgetter(1))

        for dupe, group in it.groupby(sorted_resources, itemgetter(1)):
            items = [g[0] for g in group]

            if dupe:
                _upsert = partial(session.bulk_update_mappings, Posts)
            else:
                _upsert = session.add_all

            try:
                _upsert(items)
                session.commit()
            except IntegrityError:
                # A record was added or deleted after we checked, so retry
                # 
                # modify accordingly by adding additional exceptions, e.g.,
                # except (IntegrityError, ValidationError, ValueError)
                db.session.rollback()
                upsert(items)
            except Exception as e:
                # Some other error occurred so reduce chunksize to isolate the 
                # offending row(s)
                db.session.rollback()
                num_items = len(items)

                if num_items > 1:
                    upsert(items, num_items // 2)
                else:
                    print('Error adding record {}'.format(items[0]))

사용 방법은 다음과 같습니다.

>>> data = [
...     {'id': 1, 'text': 'updated post1'},
...     {'id': 5, 'text': 'updated post5'},
...     {'id': 1000, 'text': 'new post1000'}]
...
>>> upsert(data)

이것이 갖는 장점은 대량 작업bulk_save_objects 과 달리 삽입시 관계, 오류 검사 등을 처리 할 수 ​​있다는 것 입니다.


답변