Python如何把Spark数据写入ElasticSearch
这里以将Apache的日志写入到ElasticSearch为例,来演示一下如何使用Python将Spark数据导入到ES中。
实际工作中,由于数据与使用框架或技术的复杂性,数据的写入变得比较复杂,在这里我们简单演示一下。
如果使用Scala或Java的话,Spark提供自带了支持写入ES的支持库,但Python不支持。所以首先你需要去这里下载依赖的ES官方开发的依赖包包。
下载完成后,放在本地目录,以下面命令方式启动pyspark:
pyspark--jarselasticsearch-hadoop-6.4.1.jar
如果你想pyspark使用Python3,请设置环境变量:
exportPYSPARK_PYTHON=/usr/bin/python3
理解如何写入ES的关键是要明白,ES是一个JSON格式的数据库,它有一个必须的要求。数据格式必须采用以下格式
{"id:{therestofyourjson}}
往下会展示如何转换成这种格式。
解析Apache日志文件
我们将Apache的日志文件读入,构建SparkRDD。然后我们写一个parse()函数用正则表达式处理每条日志,提取我们需要的字
rdd=sc.textFile("/home/ubuntu/walker/apache_logs")
regex='^(\S+)(\S+)(\S+)\[([\w:/]+\s[+\-]\d{4})\]"(\S+)\s?(\S+)?\s?(\S+)?"(\d{3}|-)(\d+|-)\s?"?([^"]*)"?\s?"?([^"]*)?"?$'
p=re.compile(regex)
defparse(str):
s=p.match(str)
d={}
d['ip']=s.group(1)
d['date']=s.group(4)
d['operation']=s.group(5)
d['uri']=s.group(6)
returnd
换句话说,我们刚开始从日志文件读入RDD的数据类似如下:
['83.149.9.216--[17/May/2015:10:05:03+0000]"GET/presentations/logstash-monitorama-2013/images/kibana-search.pngHTTP/1.1"200203023"http://semicomplete.com/presentations/logstash-monitorama-2013/""Mozilla/5.0(Macintosh;IntelMacOSX10_9_1)AppleWebKit/537.36(KHTML,likeGecko)Chrome/32.0.1700.77Safari/537.36"']
然后我们使用map函数转换每条记录:
rdd2=rdd.map(parse)
rdd2.take(1)
[{'date':'17/May/2015:10:05:03+0000','ip':'83.149.9.216','operation':'GET','uri':'/presentations/logstash-monitorama-2013/images/kibana-search.png'}]
现在看起来像JSON,但并不是JSON字符串,我们需要使用json.dumps将dict对象转换。
我们同时增加一个doc_id字段作为整个JSON的ID。在配置ES中我们增加如下配置“es.mapping.id”:“doc_id”告诉ES我们将这个字段作为ID。
这里我们使用SHA算法,将这个JSON字符串作为参数,得到一个唯一ID。
计算结果类似如下,可以看到ID是一个很长的SHA数值。
rdd3.take(1)
[('a5b086b04e1cc45fb4a19e2a641bf99ea3a378599ef62ba12563b75c','{"date":"17/May/2015:10:05:03+0000","ip":"83.149.9.216","operation":"GET","doc_id":"a5b086b04e1cc45fb4a19e2a641bf99ea3a378599ef62ba12563b75c","uri":"/presentations/logstash-monitorama-2013/images/kibana-search.png"}')]
现在我们需要制定ES配置,比较重要的两项是:
- “es.resource”:‘walker/apache':"walker"是索引,apache是类型,两者一般合称索引
- “es.mapping.id”:“doc_id”:告诉ES那个字段作为整个文档的ID,也就是查询结果中的_id
其他的配置自己去探索。
然后我们使用saveAsNewAPIHadoopFile()将RDD写入到ES。这部分代码对于所有的ES都是一样的,比较固定,不需要理解每一个细节
es_write_conf={
"es.nodes":"localhost",
"es.port":"9200",
"es.resource":'walker/apache',
"es.input.json":"yes",
"es.mapping.id":"doc_id"
}
rdd3.saveAsNewAPIHadoopFile(
path='-',
outputFormatClass="org.elasticsearch.hadoop.mr.EsOutputFormat",keyClass="org.apache.hadoop.io.NullWritable",
valueClass="org.elasticsearch.hadoop.mr.LinkedMapWritable",
conf=es_write_conf)
rdd3=rdd2.map(addID)
defaddId(data):
j=json.dumps(data).encode('ascii','ignore')
data['doc_id']=hashlib.sha224(j).hexdigest()
return(data['doc_id'],json.dumps(data))
最后我们可以使用curl进行查询
curlhttp://localhost:9200s/walker/apache/_search?pretty=true&?q=*
{
"_index":"walker",
"_type":"apache",
"_id":"227e977849bfd5f8d1fca69b04f7a766560745c6cb3712c106d590c2",
"_score":1.0,
"_source":{
"date":"17/May/2015:10:05:32+0000",
"ip":"91.177.205.119",
"operation":"GET",
"doc_id":"227e977849bfd5f8d1fca69b04f7a766560745c6cb3712c106d590c2",
"uri":"/favicon.ico"
}
如下是所有代码:
importjson
importhashlib
importre
defaddId(data):
j=json.dumps(data).encode('ascii','ignore')
data['doc_id']=hashlib.sha224(j).hexdigest()
return(data['doc_id'],json.dumps(data))
defparse(str):
s=p.match(str)
d={}
d['ip']=s.group(1)
d['date']=s.group(4)
d['operation']=s.group(5)
d['uri']=s.group(6)
returnd
regex='^(\S+)(\S+)(\S+)\[([\w:/]+\s[+\-]\d{4})\]"(\S+)\s?(\S+)?\s?(\S+)?"(\d{3}|-)(\d+|-)\s?"?([^"]*)"?\s?"?([^"]*)?"?$'
p=re.compile(regex)
rdd=sc.textFile("/home/ubuntu/walker/apache_logs")
rdd2=rdd.map(parse)
rdd3=rdd2.map(addID)
es_write_conf={
"es.nodes":"localhost",
"es.port":"9200",
"es.resource":'walker/apache',
"es.input.json":"yes",
"es.mapping.id":"doc_id"
}
rdd3.saveAsNewAPIHadoopFile(
path='-',
outputFormatClass="org.elasticsearch.hadoop.mr.EsOutputFormat",keyClass="org.apache.hadoop.io.NullWritable",
valueClass="org.elasticsearch.hadoop.mr.LinkedMapWritable",
conf=es_write_conf)
也可以这么封装,其实原理是一样的
importhashlib
importjson
frompysparkimportSparkcontext
defmake_md5(line):
md5_obj=hashlib.md5()
md5_obj.encode(line)
returnmd5_obj.hexdigest()
defparse(line):
dic={}
l=line.split('\t')
doc_id=make_md5(line)
dic['name']=l[1]
dic['age']=l[2]
dic['doc_id']=doc_id
returndic#记得这边返回的是字典类型的,在写入es之前要记得dumps
defsaveData2es(pdd,es_host,port,index,index_type,key):
"""
把saprk的运行结果写入es
:parampdd:一个rdd类型的数据
:parames_host:要写es的ip
:paramindex:要写入数据的索引
:paramindex_type:索引的类型
:paramkey:指定文档的id,就是要以文档的那个字段作为_id
:return:
"""
#实例es客户端记得单例模式
ifes.exist.index(index):
es.index.create(index,'spo')
es_write_conf={
"es.nodes":es_host,
"es.port":port,
"es.resource":index/index_type,
"es.input.json":"yes",
"es.mapping.id":key
}
(pdd.map(lambda_dic:('',json.dumps(_dic))))#这百年是为把这个数据构造成元组格式,如果传进来的_dic是字典则需要jdumps,如果传进来之前就已经dumps,这便就不需要dumps了
.saveAsNewAPIHadoopFile(
path='-',
outputFormatClass="org.elasticsearch.hadoop.mr.EsOutputFormat",keyClass="org.apache.hadoop.io.NullWritable",
valueClass="org.elasticsearch.hadoop.mr.LinkedMapWritable",
conf=es_write_conf)
)
if__name__=='__main__':
#实例化sp对象
sc=Sparkcontext()
#文件中的呢内容一行一行用sc的读取出来
json_text=sc.textFile('./1.txt')
#进行转换
json_data=json_text.map(lambdaline:parse(line))
saveData2es(json_data,'127.0.01','9200','index_test','index_type','doc_id')
sc.stop()
看到了把,面那个例子在写入es之前加了一个id,返回一个元组格式的,现在这个封装指定_id就会比较灵活了
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持毛票票。