Spark调优多线程并行处理任务实现方式
方式1:
1.明确Spark中Job与Streaming中Job的区别
1.1SparkCore
一个RDDDAGGraph可以生成一个或多个Job(Action操作)
一个Job可以认为就是会最终输出一个结果RDD的一条由RDD组织而成的计算
Job在spark里应用里是一个被调度的单位
1.2Streaming
一个batch的数据对应一个DStreamGraph
而一个DStreamGraph包含一或多个关于DStream的输出操作
每一个输出对应于一个Job,一个DStreamGraph对应一个JobSet,里面包含一个或多个Job
2.StreamingJob的并行度
Job的并行度由两个配置决定:
spark.scheduler.mode(FIFO/FAIR)
spark.streaming.concurrentJobs
一个Batch可能会有多个Action执行,比如注册了多个Kafka数据流,每个Action都会产生一个Job
所以一个Batch有可能是一批Job,也就是JobSet的概念
这些Job由jobExecutor依次提交执行
而JobExecutor是一个默认池子大小为1的线程池,所以只能执行完一个Job再执行另外一个Job
这里说的池子,大小就是由spark.streaming.concurrentJobs控制的
concurrentJobs决定了向SparkCore提交Job的并行度
提交一个Job,必须等这个执行完了,才会提交第二个
假设我们把它设置为2,则会并发的把Job提交给SparkCore
Spark有自己的机制决定如何运行这两个Job,这个机制其实就是FIFO或者FAIR(决定了资源的分配规则)
默认是FIFO,也就是先进先出,把concurrentJobs设置为2,但是如果底层是FIFO,那么会优先执行先提交的Job
虽然如此,如果资源够两个job运行,还是会并行运行两个Job
SparkStreaming不同Batch任务可以并行计算么 https://developer.aliyun.com/article/73004
conf.setMaster("local[4]")
conf.set("spark.streaming.concurrentJobs","3")//job并行对
conf.set("spark.scheduler.mode","FIFO")
valsc=newStreamingContext(conf,Seconds(5))
你会发现,不同batch的job其实也可以并行运行的,这里需要有几个条件:
有延时发生了,batch无法在本batch完成
concurrentJobs>1
如果schedulermode是FIFO则需要某个Job无法一直消耗掉所有资源
Mode是FAIR则尽力保证你的Job是并行运行的,毫无疑问是可以并行的。
方式2:
场景1:
程序每次处理的数据量是波动的,比如周末比工作日多很多,晚八点比凌晨四点多很多。
一个spark程序处理的时间在1-2小时波动是OK的。而sparkstreaming程序不可以,如果每次处理的时间是1-10分钟,就很蛋疼。
设置10分钟吧,实际上10分钟的也就那一段高峰时间,如果设置每次是1分钟,很多时候会出现程序处理不过来,排队过多的任务延迟更久,还可能出现程序崩溃的可能。
场景2:
- 程序需要处理的相似job数随着业务的增长越来越多
- 我们知道spark的api里无相互依赖的stage是并行处理的,但是job之间是串行处理的。
- spark程序通常是离线处理,比如T+1之类的延迟,时间变长是可以容忍的。而sparkstreaming是准实时的,如果业务增长导致延迟增加就很不合理。
spark虽然是串行执行job,但是是可以把job放到线程池里多线程执行的。如何在一个SparkContext中提交多个任务
DStream.foreachRDD{ rdd=> //创建线程池 valexecutors=Executors.newFixedThreadPool(rules.length) //将规则放入线程池 for(ru<-rules){ valtask=executors.submit(newCallable[String]{ overridedefcall():String={ //执行规则 runRule(ru,spark) } }) } //每次创建的线程池执行完所有规则后shutdown executors.shutdown() }
注意点
1.最后需要executors.shutdown()。
- 如果是executors.shutdownNow()会发生未执行完的task强制关闭线程。
- 如果使用executors.awaitTermination()则会发生阻塞,不是我们想要的结果。
- 如果没有这个shutdowm操作,程序会正常执行,但是长时间会产生大量无用的线程池,因为每次foreachRDD都会创建一个线程池。
2.可不可以将创建线程池放到foreachRDD外面?
不可以,这个关系到对于scala闭包到理解,经测试,第一次或者前几次batch是正常的,后面的batch无线程可用。
3.线程池executor崩溃了就会导致数据丢失
原则上是这样的,但是正常的代码一般不会发生executor崩溃。至少我在使用的时候没遇到过。
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持毛票票。