Introduction
In HTTP, the request/response cycle should be as time-efficient as possible to enhance user-experience. However, sometimes a time-consuming task may appear in an endpoint (like the mock below), compromising the response speed. Such tasks block the request/response cycle until they complete running, which is not desirable for the user.
# route for mocking a time-consuming or long-running task
@app.route("/time_consuming_task")
def time_consuming_task():
#mock some long-running task here
return ... (this response is blocked/delayed by the long-running task above)
Some examples of long-running tasks are: sending email, dumping database, generating pdf of a large report. In such cases, we shouldn’t punish the user by having them wait until execution is completed. We can save the situation by adding such tasks (statements) to a queue.
For task queuing in Flask, we can use Celery (the de facto async task processing queue for python), which will offload long-running tasks to separate worker threads. Celery communicates via messages, usually using a broker (e.g. Redis and RabbitMQ) to mediate between clients and workers. Celery uses a broker (like redis) to facilitate communication between the Celery worker and the web application.
To initiate a task, the Celery client adds a message to the queue, and the broker then delivers that message to a worker. After processing, the results are added to the backend.
Important steps in Windows OS
Note: My Flask application package name is etoolbox, and the tasks are in the module tasks.py.
First install Celery using: pip install celery - I think it automatically installs redis. Then do the following:
- start redis broker (click redis-server.exe)
- start Celery worker on a separate shell: celery -A etoolbox.tasks.celery worker --loglevel=DEBUG
- start a beat scheduler on a separate shell: celery -A etoolbox.tasks.celery beat refer here
- celery1_start_redis_server.exe (I just changed name from redis_server)
- celery2_start_beat.bat (for beat scheduler)
- celery3_start_worker.bat (for worker)
To make things easier, I created another batch file with the following content. Clicking this batch file would do everything required.
REM the first line should run only on dev REM && set flask_debug=1 if %CD%==D:\etoolbox start cmd /k "venv\scripts\activate && set flask_app=etoolbox && set flask_env=development && flask run" start cmd /k "celery1_start_redis_server.exe" start cmd /k "celery2_start_beat.bat" start cmd /k "celery3_start_worker.bat"
NOTE: In production, you may schedule the above batch file to run at system startup
In order to daemonize Celery in Windows, I have simply created batch files that run worker and beat (scheduler). It works quite well. Note that a daemon is a program that runs in the background continuously and exists for the purpose of handling periodic service requests that a computer system expects to receive. The daemon program forwards the requests to other programs (or processes) as appropriate. For instance, each server of pages on the Web has an HTTPD or Hypertext Transfer Protocol daemon that continually waits for requests to come in from Web clients and their users.
Below, I show an example of using Celery for database backup (dump), which is often a long-running task.
Celery task function (in tasks.py)
#This task (function) is decorated with celery.task to make it a background job. Decorator adds some functionality to an object/function.
#This function itself will be executed by the Celery worker
@celery.task(name="databaseBackup")
@disable_job #some user-defined decorator
def databaseBackup():
import subprocess
from . import get_emails
developer_email, sender_email = get_emails()
dumpfile = os.path.join(basedir, app.config['BACKUP_FOLDER'], 'dump.sql')
if subprocess.run('mysqldump --quote-names --no-create-db --no-create-info --skip-add-locks --skip-disable-keys --skip-comments --skip-set-charset --complete-insert --dump-date=FALSE -q -ugrails -pserver nomad1 > %s' % dumpfile, shell=True):
#now email the file
to = developer_email
html = 'Please see attached backup file'
toSendEmail = sendEmail.apply_async(args=[to, html, [dumpfile]], kwargs={'subject': 'Database Backup'}, countdown=1)
return {"status": True} # Change is here
Manually trigger a Celery task (inside a route endpoint)
# the route endpoint
@app.route("/time_consuming_task/<task_id>", methods=["GET", "POST"]) deftime_consuming_task(task_id):
...
if request.method == 'POST'
task = databaseBackup.apply_async(args=[], countdown=0) #we can pass args and kwargs
return jsonify({"task_id": task.id}), 202
# result and status not tested
task =AsyncResult(task_id)return jsonify({"task_id":task_id, "task_result":task.result,"task_status":task}), 200.status
Schedule a Celery task using beat (in tasks.py)
celery.conf.beat_schedule = {
'create_task': {
'task': 'create_task',
#'schedule': 60.0, #every 60 seconds
#'schedule': crontab(hour=7, minute=30, day_of_week=1), # Executes every Monday morning at 7:30 a.m.
'schedule': crontab(minute='*',hour='*', day_of_week='sun'), #every minute on saturday
'args': ('1')
},
'databaseBackup': {
'task': 'databaseBackup',
#'schedule': 300.0, #every 5 mins
#'schedule': crontab(hour=7, minute=30, day_of_week=1), # Executes every Monday morning at 7:30 a.m.
#'schedule': crontab(minute='*',hour='*', day_of_week='sat-sun'), #every minute on saturday to sunday
'schedule': crontab(hour=9, minute=15, day_of_week='sun'), # Executes every sunday at 9.15
#'args': ()
},
}
Checking progress of task by Polling server
For manual triggering of a task, it means that a request is sent from client to server for an action such as database backup. Therefore, it makes sense for the client to keep polling the server to check the status of the task. See Fetch request below.
fetch('/time_consuming_task', {
method: 'POST',
headers: {
'Content-Type': 'application/json'
},
#body: JSON.stringify({ type: type }),
})
.then(response => response.json())
.then(data => getStatus(data.task_id));
In order for polling to work, we can send back the task id (task.id) to the client after queueing the task. Using Fetch or AJAX, the client then continues to poll the server to check the status of the task after every defined interval (using setTimeout) while the task itself is running in the background. Refer to this for more details
function getStatus(taskID) { ... fetch(`/time_consuming_task/${taskID}`, {
method: 'GET',headers: { 'Content-Type': 'application/json' },}) .then(res => { const taskStatus = res.task_status; if (taskStatus === 'SUCCESS' || taskStatus === 'FAILURE') return false; setTimeout(function() { getStatus(res.task_id); }, 1000); }) }
No comments:
Post a Comment