Spark处理数据排序问题如何避免OOM
错误思想
举个列子,当我们想要比较一个类型为RDD[(Long,(String,Int))]的RDD,让它先按Long分组,然后按int的值进行倒序排序,最容易想到的思维就是先分组,然后把Iterable转换为list,然后sortby,但是这样却有一个致命的缺点,就是Iterable在内存中是一个指针,不占内存,而list是一个容器,占用内存,如果Iterable含有元素过多,那么极易引起OOM
valcidAndSidCountGrouped:RDD[(Long,Iterable[(String,Int)])]=cidAndSidCount.groupByKey() //4.排序,取top10 valresult:RDD[(Long,List[(String,Int)])]=cidAndSidCountGrouped.map{ case(cid,sidCountIt)=> //sidCountIt排序,取前10 //Iterable转成容器式集合的时候,如果数据量过大,极有可能导致oom (cid,sidCountIt.toList.sortBy(-_._2).take(5)) }
首先,我们要知道,RDD的排序需要shuffle,是采用了内存+磁盘来完成的排序.这样能有效避免OOM的风险,但是RDD是全部排序,所以需要针对性的过滤Key值来进行排序
方法一利用RDD排序特点
//把long(即key值)提取出来 valcids:List[Long]=categoryCountList.map(_.cid.toLong) valbuffer:ListBuffer[(Long,List[(String,Int)])]=ListBuffer[(Long,List[(String,Int)])]() //根据每个key来过滤RDD for(cid<-cids){ /* List((15,(632972a4-f811-4000-b920-dc12ea803a41,10)),(15,(f34878b8-1784-4d81-a4d1-0c93ce53e942,8)),(15,(5e3545a0-1521-4ad6-91fe-e792c20c46da,8)),(15,(66a421b0-839d-49ae-a386-5fa3ed75226f,8)),(15,(9fa653ec-5a22-4938-83c5-21521d083cd0,8))) 目标: (9,List((199f8e1d-db1a-4174-b0c2-ef095aaef3ee,9),(329b966c-d61b-46ad-949a-7e37142d384a,8),(5e3545a0-1521-4ad6-91fe-e792c20c46da,8),(e306c00b-a6c5-44c2-9c77-15e919340324,7),(bed60a57-3f81-4616-9e8b-067445695a77,7))) */ valarr:Array[(String,Int)]=cidAndSidCount.filter(cid==_._1) .sortBy(-_._2._2) .take(5) .map(_._2) buffer+=((cid,arr.toList)) } buffer.foreach(println)
这样做也有缺点:即有多少个key,就有多少个Job,占用资源
方法二利用TreeSet自动排序特性
defstatCategoryTop10Session_3(sc:SparkContext, categoryCountList:List[CategroyCount], userVisitActionRDD:RDD[UserVisitAction])={ //1.过滤出来top10品类的所有点击记录 //1.1先map出来top10的品类id valcids=categoryCountList.map(_.cid.toLong) valtopCategoryActionRDD:RDD[UserVisitAction]=userVisitActionRDD.filter(action=>cids.contains(action.click_category_id)) //2.计算每个品类下的每个session的点击量rdd((cid,sid),1) valcidAndSidCount:RDD[(Long,(String,Int))]=topCategoryActionRDD .map(action=>((action.click_category_id,action.session_id),1)) //使用自定义分区器重点理解分区器的原理 .reduceByKey(newCategoryPartitioner(cids),_+_) .map{ case((cid,sid),count)=>(cid,(sid,count)) } //3.排序取top10 //因为已经按key分好了区,所以用Mappartitions,在每个分区中新建一个TreeSet即可 valresult:RDD[(Long,List[SessionInfo])]=cidAndSidCount.mapPartitions((it:Iterator[(Long,(String,Int))])=>{ //new一个TreeSet,并同时指定排序规则 vartreeSet:mutable.TreeSet[CategorySession]=newmutable.TreeSet[CategorySession]()(newOrdering[CategorySession]{ overridedefcompare(x:CategorySession,y:CategorySession):Int={ if(x.clickCount>=y.clickCount)-1else1 } }) varid=0l iter.foreach({ case(l,session)=>{ id=l treeSet.add(session) if(treeSet.size>10)treeSet=treeSet.take(10) } }) Iterator(id,treeSet) }) result.collect.foreach(println) Thread.sleep(1000000) } } /* 根据传入的key值来决定分区号,让相同key进入相同的分区,能够避免多次shuffle */ classCategoryPartitioner(cids:List[Long])extendsPartitioner{ //用cid索引,作为将来他的分区索引. privatevalcidWithIndex:Map[Long,Int]=cids.zipWithIndex.toMap //返回集合的长度 overridedefnumPartitions:Int=cids.length //根据key返回分区的索引 overridedefgetPartition(key:Any):Int={ keymatch{ //根据品类id返回分区的索引!0-9 case(cid:Long,_)=> cidWithIndex(cid) } } }
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持毛票票。