Python使用multiprocessing实现一个最简单的分布式作业调度系统
mutilprocess像线程一样管理进程,这个是mutilprocess的核心,他与threading很是相像,对多核CPU的利用率会比threading好的多。
介绍
Python的multiprocessing模块不但支持多进程,其中managers子模块还支持把多进程分布到多台机器上。一个服务进程可以作为调度者,将任务分布到其他多个机器的多个进程中,依靠网络通信。
想到这,就在想是不是可以使用此模块来实现一个简单的作业调度系统。
实现
Job
首先创建一个Job类,为了测试简单,只包含一个jobid属性
job.py
#!/usr/bin/envpython #-*-coding:utf-8-*- classJob: def__init__(self,job_id): self.job_id=job_id
Master
Master用来派发作业和显示运行完成的作业信息
master.py
#!/usr/bin/envpython #-*-coding:utf-8-*- fromQueueimportQueue frommultiprocessing.managersimportBaseManager fromjobimportJob
classMaster:
def__init__(self): #派发出去的作业队列 self.dispatched_job_queue=Queue() #完成的作业队列 self.finished_job_queue=Queue() defget_dispatched_job_queue(self): returnself.dispatched_job_queue defget_finished_job_queue(self): returnself.finished_job_queue defstart(self): #把派发作业队列和完成作业队列注册到网络上 BaseManager.register('get_dispatched_job_queue',callable=self.get_dispatched_job_queue) BaseManager.register('get_finished_job_queue',callable=self.get_finished_job_queue) #监听端口和启动服务 manager=BaseManager(address=('0.0.0.0',8888),authkey='jobs') manager.start() #使用上面注册的方法获取队列 dispatched_jobs=manager.get_dispatched_job_queue() finished_jobs=manager.get_finished_job_queue() #这里一次派发10个作业,等到10个作业都运行完后,继续再派发10个作业 job_id=0 whileTrue: foriinrange(0,10): job_id=job_id+1 job=Job(job_id) print('Dispatchjob:%s'%job.job_id) dispatched_jobs.put(job) whilenotdispatched_jobs.empty(): job=finished_jobs.get(60) print('FinishedJob:%s'%job.job_id) manager.shutdown() if__name__=="__main__": master=Master() master.start()
Slave
Slave用来运行master派发的作业并将结果返回
slave.py
#!/usr/bin/envpython #-*-coding:utf-8-*- importtime fromQueueimportQueue frommultiprocessing.managersimportBaseManager fromjobimportJob
classSlave:
def__init__(self): #派发出去的作业队列 self.dispatched_job_queue=Queue() #完成的作业队列 self.finished_job_queue=Queue()
defstart(self):
#把派发作业队列和完成作业队列注册到网络上 BaseManager.register('get_dispatched_job_queue') BaseManager.register('get_finished_job_queue') #连接master server='127.0.0.1' print('Connecttoserver%s...'%server) manager=BaseManager(address=(server,8888),authkey='jobs') manager.connect() #使用上面注册的方法获取队列 dispatched_jobs=manager.get_dispatched_job_queue() finished_jobs=manager.get_finished_job_queue() #运行作业并返回结果,这里只是模拟作业运行,所以返回的是接收到的作业 whileTrue: job=dispatched_jobs.get(timeout=1) print('Runjob:%s'%job.job_id) time.sleep(1) finished_jobs.put(job) if__name__=="__main__": slave=Slave() slave.start()
测试
分别打开三个linux终端,第一个终端运行master,第二个和第三个终端用了运行slave,运行结果如下
master
$pythonmaster.py Dispatchjob:1 Dispatchjob:2 Dispatchjob:3 Dispatchjob:4 Dispatchjob:5 Dispatchjob:6 Dispatchjob:7 Dispatchjob:8 Dispatchjob:9 Dispatchjob:10 FinishedJob:1 FinishedJob:2 FinishedJob:3 FinishedJob:4 FinishedJob:5 FinishedJob:6 FinishedJob:7 FinishedJob:8 FinishedJob:9 Dispatchjob:11 Dispatchjob:12 Dispatchjob:13 Dispatchjob:14 Dispatchjob:15 Dispatchjob:16 Dispatchjob:17 Dispatchjob:18 Dispatchjob:19 Dispatchjob:20 FinishedJob:10 FinishedJob:11 FinishedJob:12 FinishedJob:13 FinishedJob:14 FinishedJob:15 FinishedJob:16 FinishedJob:17 FinishedJob:18 Dispatchjob:21 Dispatchjob:22 Dispatchjob:23 Dispatchjob:24 Dispatchjob:25 Dispatchjob:26 Dispatchjob:27 Dispatchjob:28 Dispatchjob:29 Dispatchjob:30
slave1
$pythonslave.py Connecttoserver127.0.0.1... Runjob:1 Runjob:2 Runjob:3 Runjob:5 Runjob:7 Runjob:9 Runjob:11 Runjob:13 Runjob:15 Runjob:17 Runjob:19 Runjob:21 Runjob:23
slave2
$pythonslave.py Connecttoserver127.0.0.1... Runjob:4 Runjob:6 Runjob:8 Runjob:10 Runjob:12 Runjob:14 Runjob:16 Runjob:18 Runjob:20 Runjob:22 Runjob:24
以上内容是小编给大家介绍的Python使用multiprocessing实现一个最简单的分布式作业调度系统,希望对大家有所帮助!