저는 Flask에서 응용 프로그램을 작성 중이며 WSGI
동기 및 차단을 제외하고 는 정말 잘 작동합니다 . 특히 타사 API를 호출하는 하나의 작업이 있으며 해당 작업을 완료하는 데 몇 분 정도 걸릴 수 있습니다. 나는 그 전화를 걸고 (실제로 일련의 전화) 그것을 실행시키고 싶습니다. 제어권은 Flask로 돌아갑니다.
내 견해는 다음과 같습니다.
@app.route('/render/<id>', methods=['POST'])
def render_script(id=None):
...
data = json.loads(request.data)
text_list = data.get('text_list')
final_file = audio_class.render_audio(data=text_list)
# do stuff
return Response(
mimetype='application/json',
status=200
)
이제 제가하고 싶은 것은
final_file = audio_class.render_audio()
Flask가 요청을 계속 처리 할 수있는 동안 메서드가 반환 될 때 실행할 콜백을 실행하고 제공합니다. 이것은 비동기 적으로 실행하기 위해 Flask가 필요한 유일한 작업이며이를 구현하는 가장 좋은 방법에 대한 조언을 원합니다.
Twisted와 Klein을 살펴 보았지만 Threading으로 충분할 수 있으므로 과잉인지 확실하지 않습니다. 아니면 셀러리가 좋은 선택일까요?
답변
저는 Celery 를 사용하여 비동기 작업을 처리합니다. 작업 대기열로 사용할 브로커를 설치해야합니다 (RabbitMQ 및 Redis 권장).
app.py
:
from flask import Flask
from celery import Celery
broker_url = 'amqp://guest@localhost' # Broker URL for RabbitMQ task queue
app = Flask(__name__)
celery = Celery(app.name, broker=broker_url)
celery.config_from_object('celeryconfig') # Your celery configurations in a celeryconfig.py
@celery.task(bind=True)
def some_long_task(self, x, y):
# Do some long task
...
@app.route('/render/<id>', methods=['POST'])
def render_script(id=None):
...
data = json.loads(request.data)
text_list = data.get('text_list')
final_file = audio_class.render_audio(data=text_list)
some_long_task.delay(x, y) # Call your async task and pass whatever necessary variables
return Response(
mimetype='application/json',
status=200
)
Flask 앱을 실행하고 다른 프로세스를 시작하여 셀러리 작업자를 실행합니다.
$ celery worker -A app.celery --loglevel=debug
또한 Flask와 함께 Celery를 사용 하는 방법 에 대한보다 심층적 인 가이드를 위해 Miguel Gringberg의 글을 참조 할 것 입니다.
답변
스레딩은 또 다른 가능한 솔루션입니다. Celery 기반 솔루션은 대규모 애플리케이션에 더 적합하지만 해당 엔드 포인트에서 너무 많은 트래픽을 예상하지 않는 경우 스레딩이 실행 가능한 대안입니다.
이 솔루션은 Miguel Grinberg의 PyCon 2016 Flask at Scale 프레젠테이션 , 특히 슬라이드 데크의 슬라이드 41 을 기반으로 합니다. 그의 코드는 원본 소스에 관심이있는 사람들을 위해 github에서도 사용할 수 있습니다 .
사용자 관점에서 코드는 다음과 같이 작동합니다.
- 장기 실행 작업을 수행하는 끝점을 호출합니다.
- 이 끝점은 작업 상태를 확인하는 링크와 함께 202 Accepted를 반환합니다.
- 상태 링크에 대한 호출은 taks가 아직 실행중인 동안 202를 반환하고 작업이 완료되면 200 (및 결과)을 반환합니다.
api 호출을 백그라운드 작업으로 변환하려면 @async_api 데코레이터를 추가하기 만하면됩니다.
다음은 완전히 포함 된 예입니다.
from flask import Flask, g, abort, current_app, request, url_for
from werkzeug.exceptions import HTTPException, InternalServerError
from flask_restful import Resource, Api
from datetime import datetime
from functools import wraps
import threading
import time
import uuid
tasks = {}
app = Flask(__name__)
api = Api(app)
@app.before_first_request
def before_first_request():
"""Start a background thread that cleans up old tasks."""
def clean_old_tasks():
"""
This function cleans up old tasks from our in-memory data structure.
"""
global tasks
while True:
# Only keep tasks that are running or that finished less than 5
# minutes ago.
five_min_ago = datetime.timestamp(datetime.utcnow()) - 5 * 60
tasks = {task_id: task for task_id, task in tasks.items()
if 'completion_timestamp' not in task or task['completion_timestamp'] > five_min_ago}
time.sleep(60)
if not current_app.config['TESTING']:
thread = threading.Thread(target=clean_old_tasks)
thread.start()
def async_api(wrapped_function):
@wraps(wrapped_function)
def new_function(*args, **kwargs):
def task_call(flask_app, environ):
# Create a request context similar to that of the original request
# so that the task can have access to flask.g, flask.request, etc.
with flask_app.request_context(environ):
try:
tasks[task_id]['return_value'] = wrapped_function(*args, **kwargs)
except HTTPException as e:
tasks[task_id]['return_value'] = current_app.handle_http_exception(e)
except Exception as e:
# The function raised an exception, so we set a 500 error
tasks[task_id]['return_value'] = InternalServerError()
if current_app.debug:
# We want to find out if something happened so reraise
raise
finally:
# We record the time of the response, to help in garbage
# collecting old tasks
tasks[task_id]['completion_timestamp'] = datetime.timestamp(datetime.utcnow())
# close the database session (if any)
# Assign an id to the asynchronous task
task_id = uuid.uuid4().hex
# Record the task, and then launch it
tasks[task_id] = {'task_thread': threading.Thread(
target=task_call, args=(current_app._get_current_object(),
request.environ))}
tasks[task_id]['task_thread'].start()
# Return a 202 response, with a link that the client can use to
# obtain task status
print(url_for('gettaskstatus', task_id=task_id))
return 'accepted', 202, {'Location': url_for('gettaskstatus', task_id=task_id)}
return new_function
class GetTaskStatus(Resource):
def get(self, task_id):
"""
Return status about an asynchronous task. If this request returns a 202
status code, it means that task hasn't finished yet. Else, the response
from the task is returned.
"""
task = tasks.get(task_id)
if task is None:
abort(404)
if 'return_value' not in task:
return '', 202, {'Location': url_for('gettaskstatus', task_id=task_id)}
return task['return_value']
class CatchAll(Resource):
@async_api
def get(self, path=''):
# perform some intensive processing
print("starting processing task, path: '%s'" % path)
time.sleep(10)
print("completed processing task, path: '%s'" % path)
return f'The answer is: {path}'
api.add_resource(CatchAll, '/<path:path>', '/')
api.add_resource(GetTaskStatus, '/status/<task_id>')
if __name__ == '__main__':
app.run(debug=True)
답변
multiprocessing.Process
와 함께 사용해 볼 수도 있습니다 daemon=True
. 이 process.start()
메서드는 차단되지 않으며 비싼 함수가 백그라운드에서 실행되는 동안 호출자에게 즉시 응답 / 상태를 반환 할 수 있습니다.
falcon 프레임 워크로 작업하고 daemon
프로세스를 사용 하는 동안 비슷한 문제가 발생했습니다 .
다음을 수행해야합니다.
from multiprocessing import Process
@app.route('/render/<id>', methods=['POST'])
def render_script(id=None):
...
heavy_process = Process( # Create a daemonic process with heavy "my_func"
target=my_func,
daemon=True
)
heavy_process.start()
return Response(
mimetype='application/json',
status=200
)
# Define some heavy function
def my_func():
time.sleep(10)
print("Process finished")
즉시 응답을 받아야하며 10 초 후에 콘솔에 인쇄 된 메시지가 표시되어야합니다.
참고 : daemonic
프로세스는 자식 프로세스를 생성 할 수 없습니다.