使用celery和Django处理异步任务的流程分析
介绍
我们可能需要一些可以安排一些任务并定期运行一些任务或异步处理长任务的东西,而这一切都可以通过在DjangoProject中使用Celery来实现。
什么是Celery?
Celery是一个专注于实时处理的任务队列,它还支持任务调度。Celery快速,简单,高度可用且灵活。
Celery需要消息传输来发送和接收消息,这可以由Redis或RabbitMQ完成。
入门
让我们开始在您的virtualenv中安装Celery软件包。
安装Celery
$pip installcelery pipinstallcelery
安装Redis
我们将MessageBroker用作Redis,所以我们安装
Linux/Mac用户
您可以从这里下载最新版本
$wgethttp://download.redis.io/releases/redis-4.0.8.tar.gz $tarxzfredis-4.0.8.tar.gz $cdredis-4.0.8 $make
Windows用户
对于Windows用户,您可以从此处获取redis的可执行文件。
安装后,请尝试是否正确安装。
$redis-cliping
它应该显示:
pong
同时安装redis的python包
$pipinstallredis
Django的第一步
现在您已经成功安装了软件包,现在就开始学习DjangoProject
settings.pyAddsomeofthesettingconfigurationinyoursettings.pyCELERY_BROKER_URL='redis://localhost:6379'CELERY_RESULT_BACKEND='redis://localhost:6379'CELERY_ACCEPT_CONTENT=['application/json']CELERY_TASK_SERIALIZER='json'CELERY_RESULT_SERIALIZER='json'CELERY_TIMEZONE="YOUR_TIMEZONE"
确保您已从YOUR_TIMEZONE更改时区。您可以从这里获取时区
主Django项目目录中创建celery.py文件
-src/-manage.py-celery_project/-__init__.py-settings.py-urls.py-celery.pycelery_project/celery.py
在celery.py模块中添加以下代码。该模块用于定义celery实例。
确保已使用django项目名称更改了项目名称(
from__future__importabsolute_import,unicode_literals
importos
fromceleryimportCelery
#setthedefaultDjangosettingsmoduleforthe'celery'program.
os.environ.setdefault('DJANGO_SETTINGS_MODULE','.settings')
app=Celery('')
#Usingastringheremeanstheworkerdoesn'thavetoserialize
#theconfigurationobjecttochildprocesses.
#-namespace='CELERY'meansallcelery-relatedconfigurationkeys
#shouldhavea`CELERY_`prefix.
app.config_from_object('django.conf:settings',namespace='CELERY')
#LoadtaskmodulesfromallregisteredDjangoappconfigs.
app.autodiscover_tasks()
@app.task(bind=True)
defdebug_task(self):
print('Request:{0!r}'.format(self.request))
celery_project/__init__.py
然后,我们需要将定义celery.py的应用程序导入到主项目目录的__init__.py。这样,我们可以确保在Django项目启动时已加载应用
from__future__importabsolute_import,unicode_literals #Thiswillmakesuretheappisalwaysimportedwhen #Djangostartssothatshared_taskwillusethisapp. from.celeryimportappascelery_app __all__=['celery_app']
创建任务
现在创建一些任务
在您在INSTALLED_APPS中注册的任何应用程序中创建一个新文件
my_app/tasks.py
from__future__importabsolute_import,unicode_literals
fromceleryimportshared_task
@shared_task(name="print_msg_with_name")
defprint_message(name,*args,**kwargs):
print("Celeryisworking!!{}haveimplementeditcorrectly.".format(name))
@shared_task(name="add_2_numbers")
defadd(x,y):
print("Addfunctionhasbeencalled!!withparams{},{}".format(x,y))
returnx+y
开始程序
打开一个NEW终端并运行以下命令以运行celery的worker实例,并将目录更改为您的主项目目录所在的位置,即,将manage.py文件放置的目录,并确保您已经激活您的virtualenv(如果已创建)。
用您的项目名称更改项目名称
$celery-A
输出:
--------------celery@rootv4.1.0(latentcall)----****--------*****--Linux-4.13.0-32-generic-x86_64-with-Ubuntu-17.10-artful2018-02-1708:09:37--*-****----**----------[config]-**----------.>app:celery_project:0x7f9039886400-**----------.>transport:redis://localhost:6379//-**----------.>results:redis://localhost:6379/-***---*---.>concurrency:4(prefork)--*******----.>taskevents:OFF(enable-Etomonitortasksinthisworker)---*****-------------------[queues].>celeryexchange=celery(direct)key=celery[tasks].add_2_numbers.celery_project.celery.debug_task.print_msg_with_name[2018-02-1708:09:37,877:INFO/MainProcess]Connectedtoredis://localhost:6379//[2018-02-1708:09:37,987:INFO/MainProcess]mingle:searchingforneighbors[2018-02-1708:09:39,084:INFO/MainProcess]mingle:allalone[2018-02-1708:09:39,121:WARNING/MainProcess]/home/jai/Desktop/demo/lib/python3.6/site-packages/celery/fixups/django.py:202:UserWarning:Usingsettings.DEBUGleadstoamemoryleak,neverusethissettinginproductionenvironments!warnings.warn('Usingsettings.DEBUGleadstoamemoryleak,never'[2018-02-1708:09:39,121:INFO/MainProcess]celery@rootready.
注意:检查上面的[tasks],它应该包含您在task.py模块中创建的任务的名称。
有关更多信息和日志,您还可以在DEBUGMODE中运行worker实例
celery
-A worker -linfo --loglevel =DEBUGcelery-A worker-linfo--loglevel=DEBUG
注意:请勿关闭此终端,应保持打开状态!!
测试任务
现在让我们从djangoshell运行任务打开Djangoshell
$python3manage.pyshell
用delay方法运行函数:
>>>frommy_app.tasksimportprint_message,add
>>>print_message.delay("JaiSinghal")
>>>add.delay(10,20)
当检查您的celeryworker实例正在运行的第二个终端时,您将获得此类型的输出,显示您的任务已收到且任务已成功完成
[2018-02-1708:12:14,375:INFO/MainProcess]Receivedtask:my_app.tasks.print_message[fe4f9787-9ee4-46da-856c-453d36556760][2018-02-1708:12:14,377:WARNING/ForkPoolWorker-4]Celeryisworking!!JaiSinghalhaveimplementeditcorrectly.[2018-02-1708:12:14,382:INFO/ForkPoolWorker-4]Taskmy_app.tasks.print_message[fe4f9787-9ee4-46da-856c-453d36556760]succeededin0.004476275000342866s:None[2018-02-1708:12:28,344:INFO/MainProcess]Receivedtask:my_app.tasks.add[ca5d2c50-87bc-4e87-92ad-99d6d9704c30][2018-02-1708:12:28,349:WARNING/ForkPoolWorker-3]Addfunctionhasbeencalled!!withparams10,20[2018-02-1708:12:28,358:INFO/ForkPoolWorker-3]Taskmy_app.tasks.add[ca5d2c50-87bc-4e87-92ad-99d6d9704c30]succeededin0.010077004999857309s:30
总结
以上所述是小编给大家介绍的使用celery和Django处理异步任务的流程分析,希望对大家有所帮助,也非常感谢大家对毛票票网站的支持!
声明:本文内容来源于网络,版权归原作者所有,内容由互联网用户自发贡献自行上传,本网站不拥有所有权,未作人工编辑处理,也不承担相关法律责任。如果您发现有涉嫌版权的内容,欢迎发送邮件至:czq8825#qq.com(发邮件时,请将#更换为@)进行举报,并提供相关证据,一经查实,本站将立刻删除涉嫌侵权内容。