python 多进程队列数据处理详解
我就废话不多说了,直接上代码吧!
#-*-coding:utf8-*- importpaho.mqtt.clientasmqtt frommultiprocessingimportProcess,Queue importtime,random,os importcamera_person_num MQTTHOST="172.19.4.4" MQTTPORT=1883 mqttClient=mqtt.Client() q=Queue() #连接MQTT服务器 defon_mqtt_connect(): mqttClient.connect(MQTTHOST,MQTTPORT,60) mqttClient.loop_start() #消息处理函数 defon_message_come(lient,userdata,msg): #print(msg.topic+":"+str(msg.payload.decode("utf-8"))) q.put(msg.payload.decode("utf-8"))#放入队列 print("产生消息",msg.payload.decode("utf-8")) #消息处理开启多进程 #p=Process(target=talk,args=("/camera/person/num/result",msg.payload.decode("utf-8"))) #p.start() defconsumer(q,pid): print("开启消费序列进程",pid) whileTrue: msg=q.get() #p=Process(target=talk,args=("/camera/person/num/result",msg,pid)) #p.start() talk("/camera/person/num/result",msg,pid) #subscribe消息订阅 defon_subscribe(): mqttClient.subscribe("test123",1)#主题为"test" mqttClient.on_message=on_message_come#消息到来处理函数 #publish消息发布 defon_publish(topic,msg,qos): mqttClient.publish(topic,msg,qos); #多进程中发布消息需要重新初始化mqttClient deftalk(topic,msg,pid): cameraPsersonNum=camera_person_num.CameraPsersonNum(msg) t_max,t_mean,t_min=cameraPsersonNum.personNum() #time.sleep(20) print("消费消息",pid,msg) mqttClient2=mqtt.Client() mqttClient2.connect(MQTTHOST,MQTTPORT,60) mqttClient2.loop_start() mqttClient2.publish(topic,'{"max":'+str(t_max)+',"mean":'+str(t_mean)+',"min:"'+t_min+'}',1) mqttClient2.disconnect() defmain(): on_mqtt_connect() on_subscribe() foriinrange(1,3): c1=Process(target=consumer,args=(q,i)) c1.start() whileTrue: pass if__name__=='__main__': main()
以上这篇python多进程队列数据处理详解就是小编分享给大家的全部内容了,希望能给大家一个参考,也希望大家多多支持毛票票。
声明:本文内容来源于网络,版权归原作者所有,内容由互联网用户自发贡献自行上传,本网站不拥有所有权,未作人工编辑处理,也不承担相关法律责任。如果您发现有涉嫌版权的内容,欢迎发送邮件至:czq8825#qq.com(发邮件时,请将#更换为@)进行举报,并提供相关证据,一经查实,本站将立刻删除涉嫌侵权内容。