[python] SQLAlchemy에서 OR 사용

문서를 살펴본 결과 SQLAlchemy에서 OR 쿼리를 수행하는 방법을 찾지 못하는 것 같습니다. 이 쿼리를 수행하고 싶습니다.

SELECT address FROM addressbook WHERE city='boston' AND (lastname='bulger' OR firstname='whitey')

같은 것이어야합니다

addr = session.query(AddressBook).filter(City == "boston").filter(????)



답변

로부터 튜토리얼 :

from sqlalchemy import or_
filter(or_(User.name == 'ed', User.name == 'wendy'))


답변

SQLAlchemy의는 비트 연산자를 오버로드 &, |그리고 ~그래서 대신 추한 어려운 읽기 접두사 구문 or_()and_()(에 같은 Bastien의 대답 ) 당신이이 연산자를 사용할 수 있습니다 :

.filter((AddressBook.lastname == 'bulger') | (AddressBook.firstname == 'whitey'))

비트 연산자의 우선 순위로 인해 괄호는 선택 사항아닙니다 .

따라서 전체 쿼리는 다음과 같습니다.

addr = session.query(AddressBook) \
    .filter(AddressBook.city == "boston") \
    .filter((AddressBook.lastname == 'bulger') | (AddressBook.firstname == 'whitey'))


답변

or_() OR 쿼리 구성 요소의 수를 알 수없는 경우이 기능이 유용 할 수 있습니다.

예를 들어, 옵션 필터가 거의없는 REST 서비스를 작성한다고 가정 해 봅시다. 필터가 true를 리턴하면 레코드를 리턴해야합니다. 반면 요청에서 매개 변수가 정의되지 않은 경우 쿼리가 변경되지 않아야합니다. or_()기능이 없으면 다음과 같이해야합니다.

query = Book.query
if filter.title and filter.author:
    query = query.filter((Book.title.ilike(filter.title))|(Book.author.ilike(filter.author)))
else if filter.title:
    query = query.filter(Book.title.ilike(filter.title))
else if filter.author:
    query = query.filter(Book.author.ilike(filter.author))

or_()기능을 사용하면 다음과 같이 다시 작성할 수 있습니다.

query = Book.query
not_null_filters = []
if filter.title:
    not_null_filters.append(Book.title.ilike(filter.title))
if filter.author:
    not_null_filters.append(Book.author.ilike(filter.author))

if len(not_null_filters) > 0:
    query = query.filter(or_(*not_null_filters))


답변

이것은 정말 도움이되었습니다. 주어진 테이블에 대한 구현은 다음과 같습니다.

def sql_replace(self, tableobject, dictargs):

    #missing check of table object is valid
    primarykeys = [key.name for key in inspect(tableobject).primary_key]

    filterargs = []
    for primkeys in primarykeys:
        if dictargs[primkeys] is not None:
            filterargs.append(getattr(db.RT_eqmtvsdata, primkeys) == dictargs[primkeys])
        else:
            return

    query = select([db.RT_eqmtvsdata]).where(and_(*filterargs))

    if self.r_ExecuteAndErrorChk2(query)[primarykeys[0]] is not None:
        # update
        filter = and_(*filterargs)
        query = tableobject.__table__.update().values(dictargs).where(filter)
        return self.w_ExecuteAndErrorChk2(query)

    else:
        query = tableobject.__table__.insert().values(dictargs)
        return self.w_ExecuteAndErrorChk2(query)

# example usage
inrow = {'eqmtvs_id': eqmtvsid, 'datetime': dtime, 'param_id': paramid}

self.sql_replace(tableobject=db.RT_eqmtvsdata, dictargs=inrow)


답변