python实现的文件同步服务器实例
本文实例讲述了python实现的文件同步服务器。分享给大家供大家参考。具体实现方法如下:
服务端使用asyncore,收到文件后保存到本地。
客户端使用pyinotify监视目录的变化,把变动的文件发送到服务端。
重点:
1.使用structs打包发送文件的信息,服务端收到后,根据文件信息来接收客户端传送过来的文件。
2.客户端使用多线程,pyinotify监视到文件变化,放到队列中,由另外一个线程发送。
上代码:
服务端:
#receivefilefromclientandstorethemintofileuseasyncore.# #/usr/bin/python #coding:utf-8 importasyncore importsocket fromsocketimporterrno importlogging importtime importsys importstruct importos importfcntl importthreading fromrrd_graphimportMakeGraph try: importrrdtool except(ImportError,ImportWarnning): print"Hopethisinformationcanhelpyou:" print"Cannotfindpyinotifymoduleinsyspath,justrun[apt-getinstallpython-rrdtool]inubuntu." sys.exit(1) classRequestHandler(asyncore.dispatcher): def__init__(self,sock,map=None,chunk_size=1024): self.logger=logging.getLogger('%s-%s'%(self.__class__.__name__,str(sock.getsockname()))) self.chunk_size=chunk_size asyncore.dispatcher.__init__(self,sock,map) self.data_to_write=list() defreadable(self): #self.logger.debug("readable()called.") returnTrue defwritable(self): response=(notself.connected)orlen(self.data_to_write) #self.logger.debug('writable()->%sdatalength->%s'%(response,len(self.data_to_write))) returnresponse defhandle_write(self): data=self.data_to_write.pop() #self.logger.debug("handle_write()->%ssize:%s",data.rstrip('\r\n'),len(data)) sent=self.send(data[:self.chunk_size]) ifsent<len(data): remaining=data[sent:] self.data_to_write.append(remaining) defhandle_read(self): self.writen_size=0 nagios_perfdata='../perfdata' head_packet_format="!LL128s128sL" head_packet_size=struct.calcsize(head_packet_format) data=self.recv(head_packet_size) ifnotdata: return filepath_len,filename_len,filepath,filename,filesize=struct.unpack(head_packet_format,data) filepath=os.path.join(nagios_perfdata,filepath[:filepath_len]) filename=filename[:filename_len] self.logger.debug("updatefile:%s"%filepath+'/'+filename) try: ifnotos.path.exists(filepath): os.makedirs(filepath) exceptOSError: pass self.fd=open(os.path.join(filepath,filename),'w') #self.fd=open(filename,'w') iffilesize>self.chunk_size: times=filesize/self.chunk_size first_part_size=times*self.chunk_size second_part_size=filesize%self.chunk_size while1: try: data=self.recv(self.chunk_size) #self.logger.debug("handle_read()->%ssize.",len(data)) exceptsocket.error,e: ife.args[0]==errno.EWOULDBLOCK: print"EWOULDBLOCK" time.sleep(1) else: #self.logger.debug("Errorhappendwhilereceivedata:%s"%e) break else: self.fd.write(data) self.fd.flush() self.writen_size+=len(data) ifself.writen_size==first_part_size: break #receivethepacketatlast while1: try: data=self.recv(second_part_size) #self.logger.debug("handle_read()->%ssize.",len(data)) exceptsocket.error,e: ife.args[0]==errno.EWOULDBLOCK: print"EWOULDBLOCK" time.sleep(1) else: #self.logger.debug("Errorhappendwhilereceivedata:%s"%e) break else: self.fd.write(data) self.fd.flush() self.writen_size+=len(data) iflen(data)==second_part_size: break eliffilesize<=self.chunk_size: while1: try: data=self.recv(filesize) #self.logger.debug("handle_read()->%ssize.",len(data)) exceptsocket.error,e: ife.args[0]==errno.EWOULDBLOCK: print"EWOULDBLOCK" time.sleep(1) else: #self.logger.debug("Errorhappendwhilereceivedata:%s"%e) break else: self.fd.write(data) self.fd.flush() self.writen_size+=len(data) iflen(data)==filesize: break self.logger.debug("Filesize:%s"%self.writen_size) classSyncServer(asyncore.dispatcher): def__init__(self,host,port): asyncore.dispatcher.__init__(self) self.debug=True self.logger=logging.getLogger(self.__class__.__name__) self.create_socket(socket.AF_INET,socket.SOCK_STREAM) self.set_reuse_addr() self.bind((host,port)) self.listen(2000) defhandle_accept(self): client_socket=self.accept() ifclient_socketisNone: pass else: sock,addr=client_socket #self.logger.debug("Incomingconnectionfrom%s"%repr(addr)) handler=RequestHandler(sock=sock) classRunServer(threading.Thread): def__init__(self): super(RunServer,self).__init__() self.daemon=False defrun(self): server=SyncServer('',9999) asyncore.loop(use_poll=True) defStartServer(): logging.basicConfig(level=logging.DEBUG, format='%(name)s:%(message)s', ) RunServer().start() #MakeGraph().start() if__name__=='__main__': StartServer()
客户端:
#monitorpathwithinotify(pythonmodule),andsendthemtoremoteserver.# #usesendfile(2)insteadofsendfunctioninsocket,ifwehavepython-sendfileinstalled.# importsocket importtime importos importsys importstruct importthreading importQueue try: importpyinotify except(ImportError,ImportWarnning): print"Hopethisinformationcanhelpyou:" print"Cannotfindpyinotifymoduleinsyspath,justrun[apt-getinstallpython-pyinotify]inubuntu." sys.exit(1) try: fromsendfileimportsendfile except(ImportError,ImportWarnning): pass filetype_filter=[".rrd",".xml"] defcheck_filetype(pathname): forsuffix_nameinfiletype_filter: ifpathname[-4:]==suffix_name: returnTrue try: end_string=pathname.rsplit('.')[-1:][0] end_int=int(end_string) except: pass else: #meanspathnameendwithdigit returnFalse classsync_file(threading.Thread): def__init__(self,addr,events_queue): super(sync_file,self).__init__() self.daemon=False self.queue=events_queue self.addr=addr self.chunk_size=1024 defrun(self): while1: event=self.queue.get() ifcheck_filetype(event.pathname): printtime.asctime(),event.maskname,event.pathname filepath=event.path.split('/')[-1:][0] filename=event.name filesize=os.stat(os.path.join(event.path,filename)).st_size sock=socket.socket(socket.AF_INET,socket.SOCK_STREAM) filepath_len=len(filepath) filename_len=len(filename) sock.connect(self.addr) offset=0 data=struct.pack("!LL128s128sL",filepath_len,filename_len,filepath,filename,filesize) fd=open(event.pathname,'rb') sock.sendall(data) if"sendfile"insys.modules: #print"usesendfile(2)" while1: sent=sendfile(sock.fileno(),fd.fileno(),offset,self.chunk_size) ifsent==0: break offset+=sent else: #print"useoriginalsendfunction" while1: data=fd.read(self.chunk_size) ifnotdata:break sock.send(data) sock.close() fd.close() classEventHandler(pyinotify.ProcessEvent): def__init__(self,events_queue): super(EventHandler,self).__init__() self.events_queue=events_queue defmy_init(self): pass defprocess_IN_CLOSE_WRITE(self,event): self.events_queue.put(event) defprocess_IN_MOVED_TO(self,event): self.events_queue.put(event) defstart_notify(path,mask,sync_server): events_queue=Queue.Queue() sync_thread_pool=list() foriinrange(500): sync_thread_pool.append(sync_file(sync_server,events_queue)) foriinsync_thread_pool: i.start() wm=pyinotify.WatchManager() notifier=pyinotify.Notifier(wm,EventHandler(events_queue)) wdd=wm.add_watch(path,mask,rec=True) notifier.loop() defdo_notify(): perfdata_path='/var/lib/pnp4nagios/perfdata' mask=pyinotify.IN_CLOSE_WRITE|pyinotify.IN_MOVED_TO sync_server=('127.0.0.1',9999) start_notify(perfdata_path,mask,sync_server) if__name__=='__main__': do_notify()
python监视线程池
#!/usr/bin/python importthreading importtime classMonitor(threading.Thread): def__init__(self,*args,**kwargs): super(Monitor,self).__init__() self.daemon=False self.args=args self.kwargs=kwargs self.pool_list=[] defrun(self): printself.args printself.kwargs forname,valueinself.kwargs.items(): obj=value[0] temp={} temp[name]=obj self.pool_list.append(temp) while1: printself.pool_list forname,valueinself.kwargs.items(): obj=value[0] parameters=value[1:] died_threads=self.cal_died_thread(self.pool_list,name) print"died_threads",died_threads ifdied_threads>0: foriinrange(died_threads): print"start%sthread..."%name t=obj[0].__class__(*parameters) t.start() self.add_to_pool_list(t,name) else: break time.sleep(0.5) defcal_died_thread(self,pool_list,name): i=0 foriteminself.pool_list: fork,vinitem.items(): ifname==k: lists=v fortinlists: ifnott.isAlive(): self.remove_from_pool_list(t) i+=1 returni defadd_to_pool_list(self,obj,name): foriteminself.pool_list: fork,vinitem.items(): ifname==k: v.append(obj) defremove_from_pool_list(self,obj): foriteminself.pool_list: fork,vinitem.items(): try: v.remove(obj) except: pass else: return
使用方法:
rrds_queue=Queue.Queue() make_rrds_pool=[] foriinrange(5): make_rrds_pool.append(MakeRrds(rrds_queue)) foriinmake_rrds_pool: i.start() make_graph_pool=[] foriinrange(5): make_graph_pool.append(MakeGraph(rrds_queue)) foriinmake_graph_pool: i.start() monitor=Monitor(make_rrds_pool=(make_rrds_pool,rrds_queue),\ make_graph_pool=(make_graph_pool,rrds_queue)) monitor.start()
解析:
1.接受字典参数,value为一个元组,第一个元素是线程池,后面的都是参数。
2.每0.5秒监视线程池中的线程数量,如果线程死掉了,记录死掉线程的数目,再启动同样数量的线程。
3.如果没有线程死去,则什么也不做。
从外部调用Django模块
importos importsys sys.path.insert(0,'/data/cloud_manage') fromdjango.core.managementimportsetup_environ importsettings setup_environ(settings) fromcommon.monitorimportMonitor fromdjango.dbimportconnection,transaction
前提就是,要新建一个django的project,这里我们新建了一个cloud_manage.
这样不仅可以调用django自身的模块,还能调用project本身的东西。
希望本文所述对大家的Python程序设计有所帮助。