python 监听salt job状态,并任务数据推送到redis中的方法
salt分发后,主动将已完成的任务数据推送到redis中,使用redis的生产者模式,进行消息传送
#coding=utf-8
importfnmatch,json,logging
importsalt.config
importsalt.utils.event
fromsalt.utils.redisimportRedisPool
importsys,os,datetime,random
importmultiprocessing,threading
fromjoi.utils.gobsAPIimportPostWeb
logger=logging.getLogger(__name__)
opts=salt.config.client_config('/data/salt/saltstack/etc/salt/master')
r_conn=RedisPool(opts.get('redis_db')).getConn()
lock=threading.Lock()
classRedisQueueDaemon(object):
'''
redis队列监听器
'''
def__init__(self,r_conn):
self.r_conn=r_conn#redis连接实例
self.task_queue='task:prod:queue'#任务消息队列
deflisten_task(self):
'''
监听主函数
'''
whileTrue:
queue_item=self.r_conn.blpop(self.task_queue,0)[1]
print"queueget",queue_item
#self.run_task(queue_item)
t=threading.Thread(target=self.run_task,args=(queue_item,))
t.start()
defrun_task(self,info):
'''
执行操作函数
'''
lock.acquire()
info=json.loads(info)
ifinfo['type']=='pushTaskData':
task_data=self.getTaskData(info['jid'])
task_data=json.loads(task_data)iftask_dataelse[]
logger.info('获取缓存数据:%s'%task_data)
iftask_data:
ifself.sendTaskData2bs(task_data):
task_data=[]
self.setTaskData(info['jid'],task_data)
elifinfo['type']=='setTaskState':
self.setTaskState(info['jid'],info['state'],info['message'])
elifinfo['type']=='setTaskData':
self.setTaskData(info['jid'],info['data'])
lock.release()
defgetTaskData(self,jid):
returnself.r_conn.hget('task:'+jid,'data')
defsetTaskData(self,jid,data):
self.r_conn.hset('task:'+jid,'data',json.dumps(data))
defsendTaskData2bs(self,task_data):
logger.info('发送任务数据到后端...')
logger.info(task_data)
iftask_data:
p=PostWeb('/jgapi/verify',task_data,'pushFlowTaskData')
result=p.postRes()
printresult
ifresult['code']:
logger.info('发送成功!')
returnTrue
else:
logger.error('发送失败!')
returnFalse
else:
returnTrue
defsetTaskState(self,jid,state,message=''):
logger.info('到后端设置任务【%s】状态'%str(jid))
p=PostWeb('/jgapi/verify',{'code':jid,'state':'success','message':message},'setTaskState')
result=p.postRes()
ifresult['code']:
logger.info('设置任务【%s】状态成功!'%str(jid))
returnTrue,result
else:
logger.error('设置任务【%s】状态失败!'%str(jid))
returnresult
defsalt_job_listener():
'''
saltjob监听器
'''
sevent=salt.utils.event.get_event(
'master',
sock_dir=opts['sock_dir'],
transport=opts['transport'],
opts=opts)
whileTrue:
ret=sevent.get_event(full=True)
ifretisNone:
continue
iffnmatch.fnmatch(ret['tag'],'salt/job/*/ret/*'):
task_key='task:'+ret['data']['jid']
task_state=r_conn.hget(task_key,'state')
task_data=r_conn.hget(task_key,'data')
iftask_state:
jid_data={
'code':ret['data']['jid'],
'project_id':settings.SALT_MASTER_OPTS['project_id'],
'serverip':ret['data']['id'],
'returns':ret['data']['return'],
'name':ret['data']['id'],
'state':'success'ifret['data']['success']else'failed',
}
task_data=json.loads(task_data)iftask_dataelse[]
task_data.append(jid_data)
logger.info("新增数据:%s"%json.dumps(task_data))
r_conn.lpush('task:prod:queue',json.dumps({'type':'setTaskData','jid':ret['data']['jid'],'data':task_data}))
#r_conn.hset(task_key,'data',json.dumps(task_data))
iftask_state=='running':
iflen(task_data)>=1:
logger.info('新增消息到队列:pushTaskData')
r_conn.lpush('task:prod:queue',json.dumps({'jid':ret['data']['jid'],'type':'pushTaskData'}))
else:
logger.info('任务{0}完成,发送剩下的数据到后端...'.format(task_key))
logger.info('新增消息到队列:pushTaskData')
r_conn.lpush('task:prod:queue',json.dumps({'jid':ret['data']['jid'],'type':'pushTaskData'}))
printdatetime.datetime.now()
defrun():
print'startredisproductqueuelisterner...'
logger.info('startredisproductqueuelisterner...')
multiprocessing.Process(target=RedisQueueDaemon(r_conn).listen_task,args=()).start()
print'startsaltjoblisterner...'
logger.info('startsaltjoblisterner...')
multiprocessing.Process(target=salt_job_listener,args=()).start()
'''
p=multiprocessing.Pool(2)
print'startredisproductqueuelisterner...'
p.apply_async(redis_queue_listenr,())
print'startsaltjoblisterner...'
p.apply_async(salt_job_listener,())
p.close()
p.join()
'''
以上这篇python监听saltjob状态,并任务数据推送到redis中的方法就是小编分享给大家的全部内容了,希望能给大家一个参考,也希望大家多多支持毛票票。