通过celery异步处理一个查询任务的完整代码
今天介绍通过celery实现一个异步任务。有这样一个需求,前端发起一个查询的请求,但是发起查询后,查询可能不会立即返回结果。这时候,发起查询后,后端可以把这次查询当作一个task,并立即返回一个能唯一表明该task的值,如taskID(用户后面可以通过这个taskID随时查看结果),用户收到这个taskID后,可以转去处理其他任务,而不必一直等待查询结果。后端API调用celery来处理这个task,并将结果值保存在一个csv文件中,后面用户通过taskID查询时返回结果。
defapplication(environ,start_response):
"""部分代码省略"""
query_string=environ['QUERY_STRING']
serviceGroupName=""
forgetParaminquery_string.split("&"):
params=getParam.split("=")
resultInfo=""
ifparams[0]=="type":
alertType=params[1]
elifparams[0]=="projectName":
projectName=params[1]
elifparams[0]=="serviceGroupName":
serviceGroupName=params[1]
else:
resultInfo=error_info(-1,"GET参数只能为type=>&projectName=>&serviceGroupName=>;必须指定三个参数",{})
return[resultInfo]
taskId=1
result_file_name='/var/www/dba_api/api/test/'+str(taskId)+'.csv'
contentInfo=json.dumps({"taskId":1,"opType":"continue","serviceGroupName":serviceGroupName,"dbHost":dbHost,"dbPasswd":dbPasswd,"dbUser":dbUser,"dbPort":dbPort})
result=getServiceInfo.apply_async((contentInfo,),queue="getServiceInfo")
taskInfo="任务已经创建,详情请查看:http://10.4.34.254/api/task?taskId=%s"%(taskId)
return[resultInfo]
getServiceInfo.apply_async((contentInfo,),queue=”getServiceInfo”),重点是这一行,apply_async()方法会返回一个AsyncResult实例,通过这个实例可以跟踪任务状态轨迹。
要使用此功能,需要提供结果后台(resultbackend),这样才有地方存储任务状态等信息。其中,getServiceInfo是自定义的一个task,后续会介绍到,contentInfo是传递的一个参数,queue是指定队列名称。
上面这个函数的原型如下:
task.apply_async(args[,kwargs[,…]])
其中args和kwargs分别是task接收的参数,当然它也接受额外的参数对任务进行控制。
在Celery中执行任务的方法一共有三种:
1.delay,用来进行最简单便捷的任务执行(delay在第3小节的测试中使用过,它可以看作是apply_async的一个快捷方式);
2.apply_async,对于任务的执行附加额外的参数,对任务进行控制;
3.app.send_task,可以执行未在Celery中进行注册的任务。
celery文件配置
在python的库存放路径中(一般是/usr/lib/python2.6/site-packages),创建一个文件夹proj,进入proj目录,创建三个文件,init,将proj声明一个python包,celepy,其内容如下:
#_*_coding:utf-8_*_
from__future__importabsolute_import
fromceleryimportCelery
app=Celery("proj",
broker="amqp://user:password@localhost//",
backend="amqp",
include=["proj.tasks"]
)
app.conf.update(
CELERY_ROUTES={
"proj.tasks.getServerInfo":{"queue":"getServerInfo"},
}
)
if__name__=="__main__":
app.start()
这里我们定义了模块名称proj以及celery路由。
还有一个文件,task.py
#_*_coding:utf-8_*_i
from__future__importabsolute_import
fromproj.celeryimportapp
importrandom
importsimplejsonasjson
importtypes
importtime
importMySQLdb
importurllib2
importConfigParserascparser
importhmac
importhashlib
importbase64
@app.task
defgetServiceInfo(contentInfo):
contentInfo=json.loads(contentInfo)
serviceGroupName=contentInfo['serviceGroupName']
dbHost=contentInfo['dbHost']
dbPort=int(contentInfo['dbPort'])
dbUser=contentInfo['dbUser']
dbPasswd=contentInfo['dbPasswd']
msgLib=MessageLib.MessageLib()
Sql="YourSQL"
#第三步:连接数据库,执行代码逻辑
try:
db_connection=MySQLdb.connect(host=dbHost,port=dbPort,passwd=dbPasswd,db="cmdb",user=dbUser,connect_timeout=2,charset="utf8")
cursor=db_connection.cursor()
cursor.execute(getServiceGroupHostSql)
row=cursor.fetchall()
result=[]
forlineinrow:
...
result.append(tempMysqlHighInfo)
resultInfo=msgLib.success_info(result)
returnresultInfo
exceptException,e:
raise
errorInfo="dbhost:%s,port:%s,error:%s"%(dbHost,dbPort,str(e))
#returngetServiceGroupHostSql,errorInfo
returnmsgLib.error_info(-1,errorInfo,{})
启动celery
celery-Aprojworker-QgetServiceInfo-ldebug-c6
最后,写一个结果,专门获取查询结果的结果,传入的参数为taskID,部分代码如下:
defapplication(environ,start_response):
status='400ERROR'
response_headers=[('Content-type','application/json;charset=utf-8')]
start_response(status,response_headers)
status='200OK'
response_headers=[('Content-type','application/json;charset=utf-8')]
start_response(status,response_headers)
ifenviron['REQUEST_METHOD']!="GET":
resultInfo=msgLib.error_info(-1,"http请求类型不是GET",{})
return[resultInfo]
query_string=environ['QUERY_STRING']
serviceGroupName=""
forgetParaminquery_string.split("&"):
params=getParam.split("=")
resultInfo=""
ifparams[0]=="taskId":
taskId=params[1]
else:
resultInfo=msgLib.error_info(-1,"GET参数无比指定taskId这个参数",{})
return[resultInfo]
logging.info(query_string)
result_file_name='/var/www/dba_api/api/test/'+str(taskId)+'.csv'
result=[]
try:
withopen(result_file_name,'rb')asfp:
lines=csv.reader(fp)
forlineinlines:
result.append(line)
resultInfo=msgLib.success_info(result)
returnresultInfo
exceptException,e:
errorInfo="somethingwrong"
returnmsgLib.error_info(-1,errorInfo,{})
以上这篇通过celery异步处理一个查询任务的完整代码就是小编分享给大家的全部内容了,希望能给大家一个参考,也希望大家多多支持毛票票。
声明:本文内容来源于网络,版权归原作者所有,内容由互联网用户自发贡献自行上传,本网站不拥有所有权,未作人工编辑处理,也不承担相关法律责任。如果您发现有涉嫌版权的内容,欢迎发送邮件至:czq8825#qq.com(发邮件时,请将#更换为@)进行举报,并提供相关证据,一经查实,本站将立刻删除涉嫌侵权内容。