python 解决动态的定义变量名,并给其赋值的方法(大数据处理)
最近消费kafka数据到磁盘的时候遇到了这样的问题:
需求:每天大概有1千万条数据,每条数据包含19个字段信息,需要将数据写到服务器磁盘,以第二个字段作为大类建立目录,第7个字段作为小类配合时间戳作为文件名,临时文件后缀tmp,当每个文件的写入条数(可配置,比如100条)达到要求条数时,将后缀tmp改为out。
问题:大类共有30个,小类不计其数而且未知,比如大类为A,小类为a,时间戳为20180606095835234,则A目录下的文件名为20180606095835234_a.tmp,这样一来需要在此文件写满100条时,更新时间戳生成第二个文件名,如果此时有1000个文件都在写则需要有1000个时间戳,和1000个计数器记录每个文件当前的条数,如果分别定义1000个变量显然是不划算的,
尝试:中间过程想到了动态定义变量名,即
定义第七个字段:seven=data.split('|')[7]
定义文件名:filename=time_stamp+'_'+seven+'.tmp',
定义文件计数器:seven+‘_num'=0
定义文件时间戳:seven+'_stamp'=time.time()
想法其实是没问题的,但是这里用到了一个不常用的语法:用一个变量名和一个字符串拼接出来一个新的变量名,并继续赋值(不知道我的表述是否清楚),试过了用local()函数、global()函数、exec()函数都没有达到预期效果,也许是把问题想的太复杂了
解决:最后使用三个字典将这个问题完美解决,
定义一个字典用来存计数器,字典的每一个键对应一个文件名,值对应当前计数,并实时更新;
定义一个字典用来存时间戳,键对应一个文件名,值对应时间戳,达到100条就更新一次;
定义一个字典用来存大类,键对应代号,值对应分类;
局部功能代码如下:
defkafka_to_disk():
print('启动前检测上次运行时是否存在意外中断的数据文件......')
print('搜索最近一次执行脚本产生的时间目录......')
#待处理临时文件列表
tmp_list=[]
try:
forcategory_dirinos.listdir(local_file_path):
iflen(os.listdir(local_file_path+os.sep+category_dir))>0:
forfileinos.listdir(local_file_path+os.sep+category_dir):
ifsuffixinfile:
tmp_list.append(local_file_path+os.sep+category_dir+os.sep+file)
#print('上次运行程序产生的临时文件有---{}'.format(tmp_list))
exceptExceptionase:
pass
iflen(tmp_list)==0:
print('未扫描任何残留临时文件')
else:
print('开始修复残留临时文件......')
tmp_num=0
fortmpintmp_list:
os.rename(tmp,tmp.split('.')[0]+'.out')
tmp_num+=1
print('本次启动共修复残留临时文件★★★★★-----{}个-----★★★★★'.format(tmp_num))
category_poor={
'1':'news','2':'weibo','3':'weixin','4':'app','5':'newspaper','6':'luntan',
'7':'blog','8':'video','9':'shangji','10':'shangjia','11':'gtzy','12':'zfztb',
'13':'gyfp','14':'gjz','15':'zfxx','16':'ptztb','17':'company','18':'house',
'19':'hospital','20':'bank','21':'zone','22':'express','23':'zpgw','24':'zscq',
'25':'hotel','26':'cpws','27':'gxqy','28':'gpjj','29':'dtyy','30':'bdbk'}
time_stamp=utils.get_time_stamp()#初始化毫秒级时间戳:20180509103015125
consumer=KafkaConsumer(topic,group_id=group_id,auto_offset_reset=auto_offset_reset,bootstrap_servers=eval(bootstrap_servers))
print('连接kafka成功,数据筛选中......')
file_poor={}#子类池用于文件计数器
time_stamp_poor={}#子类时间戳池,用于触发文件切换
time_stamp=utils.get_time_stamp()#初始化毫秒级时间戳:20180509103015125
formessageinconsumer:
#提取第8个字段自动匹配目录进行创建
ifmessage.value.decode().split('|')[1]incategory_poor:
category=category_poor[message.value.decode().split('|')[1]]
else:
print(message.value.decode())
continue
category_dir=local_file_path+os.sep+category
ifnotos.path.exists(category_dir):
os.makedirs(category_dir)
#提取第2个字段,用于生成文件名
ifmessage.value.decode().split('|')[7]intime_stamp_poor:
shot_file_name=time_stamp_poor[message.value.decode().split('|')[7]]+'_'+message.value.decode().split('|')[7]
else:
shot_file_name=time_stamp+'_'+message.value.decode().split('|')[7]
file_name=category_dir+os.sep+shot_file_name+'.tmp'
#给每一个文件设定一个计数器
ifmessage.value.decode().split('|')[7]notinfile_poor:
file_poor[message.value.decode().split('|')[7]]=0
withopen(file_name,'a',encoding='utf-8')asf1:
f1.write(message.value.decode())
file_poor[message.value.decode().split('|')[7]]+=1
#触发切换文件的操作,用时间戳生成第二文件名
iffile_poor[message.value.decode().split('|')[7]]==strip_number:
time_stamp_poor[message.value.decode().split('|')[7]]=utils.get_time_stamp()
file_poor[message.value.decode().split('|')[7]]=0
以上这篇python解决动态的定义变量名,并给其赋值的方法(大数据处理)就是小编分享给大家的全部内容了,希望能给大家一个参考,也希望大家多多支持毛票票。