Django中使用Celery的方法步骤
(一)、概述
Celery是一个简单、灵活和可靠的基于多任务的分布式系统,为运营提供用于维护此系统的工具。专注于实时处理的任务队列,同时也支持任务的调度。执行单元为任务(task),利用多线程这些任务可以被并发的在单个或多个职程(worker)上运行。
Celery通过消息机制通信,通常通过中间人(broker)来分配和调节客户端与职程服务器(worker)之间的通信。客户端发送一条消息,中间人把消息分配给一个职程,最后由职程来负责执行此任务。
Celery可以有多个职程和中间人,这样提高了高可用性和横向的扩展能力
Celery由python语言开发,但是该协议可以用任何语言拉力实现,例如:Django中的Celery、node中的node-celery和php中的celery-php
(二)、Django中使用Celery的流程与配置
导入Celery:pip3installCelery
在与项目同名的目录下创建celery.py文件,特别注意:项目同名的目录下
复制内容到该文件
修改两处内容
- os.environ.setdefault('DJANGO_SETTINGS_MODULE','proj.settings')中的proj改为项目名
- app=Celery('pro')中的pro改为项目名
importos fromceleryimportCelery #setthedefaultDjangosettingsmoduleforthe'celery'program. os.environ.setdefault('DJANGO_SETTINGS_MODULE','proj.settings') app=Celery('pro') #Usingastringheremeanstheworkerdoesn'thavetoserialize #theconfigurationobjecttochildprocesses. #-namespace='CELERY'meansallcelery-relatedconfigurationkeys #shouldhavea`CELERY_`prefix. app.config_from_object('django.conf:settings',namespace='CELERY') #LoadtaskmodulesfromallregisteredDjangoappconfigs. app.autodiscover_tasks() @app.task(bind=True) defdebug_task(self): print(f'Request:{self.request!r}')
在与项目同名的目录下的__init__.py文件中添加内容
#Thiswillmakesuretheappisalwaysimportedwhen #Djangostartssothatshared_taskwillusethisapp. from.celeryimportappascelery_app __all__=('celery_app',)
在settings.py文件中添加配置
- CELERY_BROKER_URL:中间人url,可以配置redis或者RabbitMQ
- CELERY_RESULT_BACKEND:返回结果的存储地址
- CELERY_ACCEPT_CONTENT:接收内容的格式,分为两种:json和msgpack。msgpack比json格式的数据体积更小,传输速度更快。
- CELERY_TASK_SERIALIZER:任务载荷的序列化方式-->json
- CELERY_TIMEZONE
- CELERY_TASK_TRACK_STARTED:是否开启任务跟踪
- CELERY_TASK_TIME_LIMIT:任务超时限制
#Celery配置 CELERY_BROKER_URL=env("CELERY_BROKER_URL") CELERY_RESULT_BACKEND=env("CELERY_RESULT_BACKEND") CELERY_ACCEPT_CONTENT=["json","msgpack"] CELERY_TASK_SERIALIZER="json" CELERY_TIMEZONE="Asia/Shanghai" CELERY_TASK_TRACK_STARTED=True CELERY_TASK_TIME_LIMIT=30*60
在app下创建tasks.py文件,创建发送消息功能,任务方法必须添加装饰器:@shared_task
fromrest_framework.responseimportResponse fromrest_framework.genericsimportGenericAPIView fromtimeimportsleep fromceleryimportshared_task classTestView3(GenericAPIView): @classmethod @shared_task defsleep(self,duration): sleep(duration) returnResponse("成功",status=200)
创建视图和路由
###views.py from.tasksimportTestView3 classTestView1(GenericAPIView): defget(self,request): TestView3.sleep(10) returnResponse("celery实验成功") test_view_1=TestView1.as_view() ###urls.py fromdjango.urlsimportpath from.viewsimport( test_view_1 ) urlpatterns=[ path('celery/',test_view_1,name="test1") ]
安装redis并启动
启动django项目
使用Celery命令启动Celery服务,命令:celery-A项目名worker-linfo,如果如下所示则为启动成功.
celery@AppledeMacBook-Air.localv5.0.3(singularity) Darwin-20.1.0-x86_64-i386-64bit2020-12-0520:52:17 [config] .>app:drf_email_project:0x7f84a0c4ad68 .>transport:redis://127.0.0.1:6379/1%20 .>results:redis://127.0.0.1:6379/2 .>concurrency:4(prefork) .>taskevents:OFF(enable-Etomonitortasksinthisworker) [queues] .>celeryexchange=celery(direct)key=celery [tasks] .drf_email_project.celery.debug_task .users.tasks.sleep [2020-12-0520:52:18,166:INFO/MainProcess]Connectedtoredis://127.0.0.1:6379/1%20 [2020-12-0520:52:18,179:INFO/MainProcess]mingle:searchingforneighbors [2020-12-0520:52:19,212:INFO/MainProcess]mingle:allalone [2020-12-0520:52:19,248:WARNING/MainProcess]/Users/apple/drf-email/lib/python3.7/site-packages/celery/fixups/django.py:204:UserWarning:Usingsettings.DEBUGleadstoamemory leak,neverusethissettinginproductionenvironments! leak,neverusethissettinginproductionenvironments!''') [2020-12-0520:52:19,249:INFO/MainProces
到此这篇关于Django中使用Celery的方法步骤的文章就介绍到这了,更多相关Django使用Celery的方法步骤内容请搜索毛票票以前的文章或继续浏览下面的相关文章希望大家以后多多支持毛票票!