[python] Flask에서 비동기 작업 만들기

저는 Flask에서 응용 프로그램을 작성 중이며 WSGI동기 및 차단을 제외하고 는 정말 잘 작동합니다 . 특히 타사 API를 호출하는 하나의 작업이 있으며 해당 작업을 완료하는 데 몇 분 정도 걸릴 수 있습니다. 나는 그 전화를 걸고 (실제로 일련의 전화) 그것을 실행시키고 싶습니다. 제어권은 Flask로 돌아갑니다.

내 견해는 다음과 같습니다.

@app.route('/render/<id>', methods=['POST'])
def render_script(id=None):
    ...
    data = json.loads(request.data)
    text_list = data.get('text_list')
    final_file = audio_class.render_audio(data=text_list)
    # do stuff
    return Response(
        mimetype='application/json',
        status=200
    )

이제 제가하고 싶은 것은

final_file = audio_class.render_audio()

Flask가 요청을 계속 처리 할 수있는 동안 메서드가 반환 될 때 실행할 콜백을 실행하고 제공합니다. 이것은 비동기 적으로 실행하기 위해 Flask가 필요한 유일한 작업이며이를 구현하는 가장 좋은 방법에 대한 조언을 원합니다.

Twisted와 Klein을 살펴 보았지만 Threading으로 충분할 수 있으므로 과잉인지 확실하지 않습니다. 아니면 셀러리가 좋은 선택일까요?



답변

저는 Celery 를 사용하여 비동기 작업을 처리합니다. 작업 대기열로 사용할 브로커를 설치해야합니다 (RabbitMQ 및 Redis 권장).

app.py:

from flask import Flask
from celery import Celery

broker_url = 'amqp://guest@localhost'          # Broker URL for RabbitMQ task queue

app = Flask(__name__)
celery = Celery(app.name, broker=broker_url)
celery.config_from_object('celeryconfig')      # Your celery configurations in a celeryconfig.py

@celery.task(bind=True)
def some_long_task(self, x, y):
    # Do some long task
    ...

@app.route('/render/<id>', methods=['POST'])
def render_script(id=None):
    ...
    data = json.loads(request.data)
    text_list = data.get('text_list')
    final_file = audio_class.render_audio(data=text_list)
    some_long_task.delay(x, y)                 # Call your async task and pass whatever necessary variables
    return Response(
        mimetype='application/json',
        status=200
    )

Flask 앱을 ​​실행하고 다른 프로세스를 시작하여 셀러리 작업자를 실행합니다.

$ celery worker -A app.celery --loglevel=debug

또한 Flask와 함께 Celery를 사용 하는 방법 에 대한보다 심층적 인 가이드를 위해 Miguel Gringberg의 글을 참조 할 것 입니다.


답변

스레딩은 또 다른 가능한 솔루션입니다. Celery 기반 솔루션은 대규모 애플리케이션에 더 적합하지만 해당 엔드 포인트에서 너무 많은 트래픽을 예상하지 않는 경우 스레딩이 실행 가능한 대안입니다.

이 솔루션은 Miguel Grinberg의 PyCon 2016 Flask at Scale 프레젠테이션 , 특히 슬라이드 데크의 슬라이드 41 을 기반으로 합니다. 그의 코드는 원본 소스에 관심이있는 사람들을 위해 github에서도 사용할 수 있습니다 .

사용자 관점에서 코드는 다음과 같이 작동합니다.

  1. 장기 실행 작업을 수행하는 끝점을 호출합니다.
  2. 이 끝점은 작업 상태를 확인하는 링크와 함께 202 Accepted를 반환합니다.
  3. 상태 링크에 대한 호출은 taks가 아직 실행중인 동안 202를 반환하고 작업이 완료되면 200 (및 결과)을 반환합니다.

api 호출을 백그라운드 작업으로 변환하려면 @async_api 데코레이터를 추가하기 만하면됩니다.

다음은 완전히 포함 된 예입니다.

from flask import Flask, g, abort, current_app, request, url_for
from werkzeug.exceptions import HTTPException, InternalServerError
from flask_restful import Resource, Api
from datetime import datetime
from functools import wraps
import threading
import time
import uuid

tasks = {}

app = Flask(__name__)
api = Api(app)


@app.before_first_request
def before_first_request():
    """Start a background thread that cleans up old tasks."""
    def clean_old_tasks():
        """
        This function cleans up old tasks from our in-memory data structure.
        """
        global tasks
        while True:
            # Only keep tasks that are running or that finished less than 5
            # minutes ago.
            five_min_ago = datetime.timestamp(datetime.utcnow()) - 5 * 60
            tasks = {task_id: task for task_id, task in tasks.items()
                     if 'completion_timestamp' not in task or task['completion_timestamp'] > five_min_ago}
            time.sleep(60)

    if not current_app.config['TESTING']:
        thread = threading.Thread(target=clean_old_tasks)
        thread.start()


def async_api(wrapped_function):
    @wraps(wrapped_function)
    def new_function(*args, **kwargs):
        def task_call(flask_app, environ):
            # Create a request context similar to that of the original request
            # so that the task can have access to flask.g, flask.request, etc.
            with flask_app.request_context(environ):
                try:
                    tasks[task_id]['return_value'] = wrapped_function(*args, **kwargs)
                except HTTPException as e:
                    tasks[task_id]['return_value'] = current_app.handle_http_exception(e)
                except Exception as e:
                    # The function raised an exception, so we set a 500 error
                    tasks[task_id]['return_value'] = InternalServerError()
                    if current_app.debug:
                        # We want to find out if something happened so reraise
                        raise
                finally:
                    # We record the time of the response, to help in garbage
                    # collecting old tasks
                    tasks[task_id]['completion_timestamp'] = datetime.timestamp(datetime.utcnow())

                    # close the database session (if any)

        # Assign an id to the asynchronous task
        task_id = uuid.uuid4().hex

        # Record the task, and then launch it
        tasks[task_id] = {'task_thread': threading.Thread(
            target=task_call, args=(current_app._get_current_object(),
                               request.environ))}
        tasks[task_id]['task_thread'].start()

        # Return a 202 response, with a link that the client can use to
        # obtain task status
        print(url_for('gettaskstatus', task_id=task_id))
        return 'accepted', 202, {'Location': url_for('gettaskstatus', task_id=task_id)}
    return new_function


class GetTaskStatus(Resource):
    def get(self, task_id):
        """
        Return status about an asynchronous task. If this request returns a 202
        status code, it means that task hasn't finished yet. Else, the response
        from the task is returned.
        """
        task = tasks.get(task_id)
        if task is None:
            abort(404)
        if 'return_value' not in task:
            return '', 202, {'Location': url_for('gettaskstatus', task_id=task_id)}
        return task['return_value']


class CatchAll(Resource):
    @async_api
    def get(self, path=''):
        # perform some intensive processing
        print("starting processing task, path: '%s'" % path)
        time.sleep(10)
        print("completed processing task, path: '%s'" % path)
        return f'The answer is: {path}'


api.add_resource(CatchAll, '/<path:path>', '/')
api.add_resource(GetTaskStatus, '/status/<task_id>')


if __name__ == '__main__':
    app.run(debug=True)


답변

multiprocessing.Process와 함께 사용해 볼 수도 있습니다 daemon=True. 이 process.start()메서드는 차단되지 않으며 비싼 함수가 백그라운드에서 실행되는 동안 호출자에게 즉시 응답 / 상태를 반환 할 수 있습니다.

falcon 프레임 워크로 작업하고 daemon프로세스를 사용 하는 동안 비슷한 문제가 발생했습니다 .

다음을 수행해야합니다.

from multiprocessing import Process

@app.route('/render/<id>', methods=['POST'])
def render_script(id=None):
    ...
    heavy_process = Process(  # Create a daemonic process with heavy "my_func"
        target=my_func,
        daemon=True
    )
    heavy_process.start()
    return Response(
        mimetype='application/json',
        status=200
    )

# Define some heavy function
def my_func():
    time.sleep(10)
    print("Process finished")

즉시 응답을 받아야하며 10 초 후에 콘솔에 인쇄 된 메시지가 표시되어야합니다.

참고 : daemonic프로세스는 자식 프로세스를 생성 할 수 없습니다.


답변