python实现多线程行情抓取工具的方法
思路
借助python当中threading模块与Queue模块组合可以方便的实现基于生产者-消费者模型的多线程模型。Jimmy大神的tushare一直是广大python数据分析以及业余量化爱好者喜爱的免费、开源的python财经数据接口包。
平时一直有在用阿里云服务器通过tushare的接口自动落地相关财经数据,但日复权行情数据以往在串行下载的过程当中,速度比较慢,有时遇到网络原因还需要重下。每只股票的行情下载过程中都需要完成下载、落地2个步骤,一个可能需要网络开销、一个需要数据库mysql的存取开销。2者原本就可以独立并行执行,是个典型的“生产者-消费者”模型。
基于queue与threading模块的线程使用一般采用以下的套路:
producerQueue=Queue() consumerQueue=Queue() lock=threading.Lock() classproducerThead(threading.Thread): def__init__(self,producerQueue,consumerQueue): self.producerQueue=producerQueue self.consumerQueue=consumerQueue defrun(self): whilenotself.thread_stop: try: #接收任务,如果连续20秒没有新的任务,线程退出,否则会一直执行 item=self.producerQueue.get(block=True,timeout=20) #阻塞调用进程直到有数据可用。如果timeout是个正整数, #阻塞调用进程最多timeout秒, #如果一直无数据可用,抛出Empty异常(带超时的阻塞调用) exceptQueue.Empty: print("Nothingtodo!threadexit!") self.thread_stop=True break #实现生产者逻辑,生成消费者需要处理的内容consumerQueue.put(someItem) #还可以边处理,边生成新的生产任务 doSomethingAboutProducing() self.producerQueue.task_done() defstop(self): self.thread_stop=True classconsumerThead(threading.Thread): def__init__(self,lock,consumerQueue): self.consumerQueue=consumerQueue defrun(self): whiletrue: try: #接收任务,如果连续20秒没有新的任务,线程退出,否则会一直执行 item=self.consumerQueue.get(block=True,timeout=20) #阻塞调用进程直到有数据可用。如果timeout是个正整数, #阻塞调用进程最多timeout秒, #如果一直无数据可用,抛出Empty异常(带超时的阻塞调用) exceptQueue.Empty: print("Nothingtodo!threadexit!") self.thread_stop=True break doSomethingAboutConsuming(lock)#处理消费者逻辑,必要时使用线程锁,如文件操作等 self.consumerQueue.task_done() #定义主线程 defmain(): foriinrange(n):#定义n个i消费者线程 t=ThreadRead(producerQueue,consumerQueue) t.setDaemon(True) t.start() producerTasks=[]#定义初始化生产者任务队列 producerQueue.put(producerTasks) foriinrange(n):#定义n个生产者钱程 t=ThreadWrite(consumerQueue,lock) t.setDaemon(True) t.start() stock_queue.join() data_queue.join()
相关接口
1,股票列表信息接口
作用
获取沪深上市公司基本情况。属性包括:
code,代码 name,名称 industry,所属行业 area,地区 pe,市盈率 outstanding,流通股本(亿) totals,总股本(亿) totalAssets,总资产(万) liquidAssets,流动资产 fixedAssets,固定资产 reserved,公积金 reservedPerShare,每股公积金 esp,每股收益 bvps,每股净资 pb,市净率 timeToMarket,上市日期 undp,未分利润 perundp,每股未分配 rev,收入同比(%) profit,利润同比(%) gpr,毛利率(%) npr,净利润率(%) holders,股东人数
调用方法
importtushareasts ts.get_stock_basics()
返回效果
nameindustryareapeoutstandingtotalstotalAssets code 600606金丰投资房产服务上海0.0051832.0151832.01744930.44 002285世联行房产服务深圳71.0476352.1776377.60411595.28 000861海印股份房产服务广东126.2083775.50118413.84730716.56 000526银润投资房产服务福建2421.169619.509619.5020065.32 000056深国商房产服务深圳0.0014305.5526508.14787195.94 600895张江高科园区开发上海171.60154868.95154868.951771040.38 600736苏州高新园区开发江苏48.68105788.15105788.152125485.75 600663陆家嘴园区开发上海47.63135808.41186768.414562074.50 600658电子城园区开发北京19.3958009.7358009.73431300.19 600648外高桥园区开发上海65.3681022.34113534.902508100.75 600639浦东金桥园区开发上海57.2865664.8892882.501241577.00 600604市北高新园区开发上海692.8733352.4256644.92329289.50
2,日复权行情接口
作用
提供股票上市以来所有历史数据,默认为前复权,读取后存到本地,作为后续分析的基础
调用方法
ts.get_h_data('002337',start='2015-01-01',end='2015-03-16')#两个日期之间的前复权数据 parameter: code:string,股票代码e.g.600848 start:string,开始日期format:YYYY-MM-DD为空时取当前日期 end:string,结束日期format:YYYY-MM-DD为空时取去年今日 autype:string,复权类型,qfq-前复权hfq-后复权None-不复权,默认为qfq index:Boolean,是否是大盘指数,默认为False retry_count:int,默认3,如遇网络等问题重复执行的次数 pause:int,默认0,重复请求数据过程中暂停的秒数,防止请求间隔时间太短出现的问题 return: date:交易日期(index) open:开盘价 high:最高价 close:收盘价 low:最低价 volume:成交量 amount:成交金额
返回结果
openhighcloselowvolumeamount date 2015-03-1613.2713.4513.3913.00812129761073862784 2015-03-1313.0413.3813.3713.0040548836532739744 2015-03-1213.2913.9513.2812.9671505720962979904 2015-03-1113.3513.4813.1513.0059110248780300736 2015-03-1013.1613.6713.5912.721057530881393819776 2015-03-0913.7714.7314.1313.701390915521994454656 2015-03-0612.1713.3913.3912.17894867041167752960 2015-03-0512.7912.8012.1712.0826040832966927360 2015-03-0413.9613.9613.3012.58266361741060270720 2015-03-0312.1713.1013.1012.0519290366733336768
实现
废话不多说,直接上代码,
生产者线程,读取行情
classThreadRead(threading.Thread): def__init__(self,queue,out_queue): ''' 用于根据股票代码、需要读取的日期,读取增量的日行情数据, :paramqueue:用于保存需要读取的股票代码、起始日期的列表 :paramout_queue:用于保存需要写入到数据库表的结果集列表 :return: ''' threading.Thread.__init__(self) self.queue=queue self.out_queue=out_queue defrun(self): whiletrue: item=self.queue.get() time.sleep(0.5) try: df_h_data=ts.get_h_data(item['code'],start=item['startdate'],retry_count=10,pause=0.01) ifdf_h_dataisnotNoneandlen(df_h_data)>0: df_h_data['secucode']=item['code'] df_h_data.index.name='date' printdf_h_data.index,item['code'],item['startdate'] df_h_data['tradeday']=df_h_data.index.strftime('%Y-%m-%d') self.out_queue.put(df_h_data) exceptException,e: printstr(e) self.queue.put(item)#将没有爬取成功的数据放回队列里面去,以便下次重试。 time.sleep(10) continue self.queue.task_done()
消费者线程,本地存储
classThreadWrite(threading.Thread): def__init__(self,queue,lock,db_engine): ''' :paramqueue:某种形式的任务队列,此处为tushare为每个股票返回的最新日复权行情数据 :paramlock:暂时用连接互斥操作,防止mysql高并发,后续可尝试去掉 :paramdb_engine:mysql数据库的连接对象 :return:no ''' threading.Thread.__init__(self) self.queue=queue self.lock=lock self.db_engine=db_engine defrun(self): whileTrue: item=self.queue.get() self._save_data(item) self.queue.task_done() def_save_data(self,item): withself.lock: try: item.to_sql('cron_dailyquote',self.db_engine,if_exists='append',index=False) exceptException,e:#如果是新股,则有可能df_h_data是空对象,因此需要跳过此类情况不处理 printstr(e)
定义主线程
fromQueueimportQueue stock_queue=Queue() data_queue=Queue() lock=threading.Lock() defmain(): ''' 用于测试多线程读取数据 :return: ''' #获取环境变量,取得相应的环境配置,上线时不需要再变更代码 globalstock_queue globaldata_queue config=os.getenv('FLASK_CONFIG') ifconfig=='default': db_url='mysql+pymysql://root:******@localhost:3306/python?charset=utf8mb4' else: db_url='mysql+pymysql://root:******@localhost:3306/test?charset=utf8mb4' db_engine=create_engine(db_url,echo=True) conn=db_engine.connect() #TODO增加ts.get_stock_basics()报错的处理,如果取不到信息则直接用数据库中的股票代码信息,来获取增量信息 #TODO增加一个标志,如果一个股票代码的最新日期不是最新日期,则需标记该代码不需要重新获取数据,即记录该股票更新日期到了最新工作日, df=ts.get_stock_basics() df.to_sql('stock_basics',db_engine,if_exists='replace',dtype={'code':CHAR(6)}) #计算距离当前日期最大的工作日,以便每日定时更新 today=time.strftime('%Y-%m-%d',time.localtime(time.time())) s1=("selectmax(t.date)fromcron_tradedaytwhereflag=1andt.date<='"+today+"'") selectsql=text(s1) maxTradeay=conn.execute(selectsql).first() #计算每只股票当前加载的最大工作日期,支持重跑 s=("selectsecucode,max(t.tradeday)fromcron_dailyquotetgroupbysecucode") selectsql=text(s) result=conn.execute(selectsql)#执行查询语句 df_result=pd.DataFrame(result.fetchall()) df_result.columns=['stockcode','max_tradeday'] df_result.set_index(df_result['stockcode'],inplace=True) #开始归档前复权历史行情至数据库当中,以便可以方便地计算后续选股模型 foriinrange(3):#使用3个线程 t=ThreadRead(stock_queue,data_queue) t.setDaemon(True) t.start() forcodeinset(list(df.index)): try: #如果当前股票已经是最新的行情数据,则直接跳过,方便重跑。 #printmaxTradeay[0],df_result.loc[code].values[1] ifdf_result.loc[code].values[1]==maxTradeay[0]: continue startdate=getLastNdate(df_result.loc[code].values[1],1) exceptException,e: #如果某只股票没有相关的行情,则默认开始日期为2015年1月1日 startdate='2015-01-01' item={} item['code']=code item['startdate']=startdate stock_queue.put(item)#生成生产者任务队列 foriinrange(3): t=ThreadWrite(data_queue,lock,db_engine) t.setDaemon(True) t.start() stock_queue.join() data_queue.join()
执行效果
原本需要2,3个小时才能执行完成的每日复权行情增量落地,有效缩短至了1小时以内,这里线程数并不上越多越好,由于复权行情读的是新浪接口,在高并发情况下会返回HTTP503服务器过载的错误,另外高并发下可能需要使用IP代理池,下载的时段也需要尝试多个时段进行。初次尝试,如果有更好的方法或者哪里有考虑不周的地方欢迎留言建议或者指正。
总结
以上就是这篇文章的全部内容了,希望本文的内容对大家的学习或者工作具有一定的参考学习价值,如果有疑问大家可以留言交流,谢谢大家对毛票票的支持。