python多线程扫描端口(线程池)
扫描服务器ip开放端口,用线程池ThreadPoolExecutor,i7的cpu可以开到600个左右现成,大概20s左右扫描完65535个端口,根据电脑配置适当降低线程数
#!/usr/local/python3.6.3/bin/python3.6 #coding=utf-8 importsocket importdatetime importre fromconcurrent.futuresimportThreadPoolExecutor,wait DEBUG=False #判断ip地址输入是否符合规范 defcheck_ip(ipAddr): compile_ip=re.compile('^(1\d{2}|2[0-4]\d|25[0-5]|[1-9]\d|[1-9])\.(1\d{2}|2[0-4]\d|25[0-5]|[1-9]\d|\d)\.(1\d{2}|2[0-4]\d|25[0-5]|[1-9]\d|\d)\.(1\d{2}|2[0-4]\d|25[0-5]|[1-9]\d|\d)$') ifcompile_ip.match(ipAddr): returnTrue else: returnFalse #扫描端口程序 defportscan(ip,port): try: s=socket.socket() s.settimeout(0.2) s.connect((ip,port)) openstr=f'[+]{ip}port:{port}open' print(openstr) exceptExceptionase: ifDEBUGisTrue: print(ip+str(port)+str(e)) else: returnf'[+]{ip}port:{port}error' finally: s.close #主程序,利用ThreadPoolExecutor创建600个线程同时扫描端口 defmain(): whileTrue: ip=input("请输入ip地址:") ifcheck_ip(ip): start_time=datetime.datetime.now() executor=ThreadPoolExecutor(max_workers=600) t=[executor.submit(portscan,ip,n)forninrange(1,65536)] ifwait(t,return_when='ALL_COMPLETED'): end_time=datetime.datetime.now() print("扫描完成,用时:",(end_time-start_time).seconds) break if__name__=='__main__': main()
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持毛票票。