详解SpringBoot和SpringBatch 使用
什么是SpringBatch
SpringBatch是一个轻量级的、完善的批处理框架,旨在帮助企业建立健壮、高效的批处理应用。SpringBatch是Spring的一个子项目,使用Java语言并基于Spring框架为基础开发,使的已经使用Spring框架的开发者或者企业更容易访问和利用企业服务。
SpringBatch提供了大量可重用的组件,包括了日志、追踪、事务、任务作业统计、任务重启、跳过、重复、资源管理。对于大数据量和高性能的批处理任务,SpringBatch同样提供了高级功能和特性来支持,比如分区功能、远程功能。总之,通过SpringBatch能够支持简单的、复杂的和大数据量的批处理作业。
SpringBatch使用
我们首先配置SpringBatch在SpringBoot中的使用,数据库用的是mysql,pom文件如下,因为SpringBoot中的SpringBatch包含hsqsldb所以我们将其去除
org.springframework.boot spring-boot-starter-batch org.hsqldb hsqldb org.springframework.boot spring-boot-starter-jdbc org.springframework.boot spring-boot-starter-web org.hibernate hibernate-validator mysql mysql-connector-java 5.1.21 org.springframework.boot spring-boot-starter-test test
配置好我们需要的实体类。页面就不展示了。
如果有数据校验添加的话那么我们需要配置自定义的检验器。若果没有课略过该步骤
publicclassCsvBeanValidatorimplementsValidator ,InitializingBean{ privatejavax.validation.Validatorvalidator; @Override publicvoidvalidate(Tvalue)throwsValidationException{ Set >constraintViolations=validator.validate(value); if(constraintViolations.size()>0){ StringBuildermessage=newStringBuilder(); for(ConstraintViolation constraintViolation:constraintViolations){ message.append(constraintViolation.getMessage()+"\n"); } thrownewValidationException(message.toString()); } } //在这里我们使用的是JSR-303校验数据,在此进行初始化 @Override publicvoidafterPropertiesSet()throwsException{ ValidatorFactoryvalidatorFactory=Validation.buildDefaultValidatorFactory(); validator=validatorFactory.usingContext().getValidator(); } } publicclassCsvItemProcessorextendsValidatingItemProcessor { @Override publicPersonprocess(Personitem)throwsValidationException{ super.process(item);//在这里启动然后才会调用我们自定义的校验器,否则不能通过。 if(item.getNation().equals("汉族")){ item.setName("01"); }else{ item.setNation("02"); } returnitem; } }
进行job任务监听自定义类实现JobExecutionListener即可
longstartTime; longendTime; @Override publicvoidbeforeJob(JobExecutionjobExecution){ startTime=System.currentTimeMillis(); System.out.println("任务处理开始"); } @Override publicvoidafterJob(JobExecutionjobExecution){ endTime=System.currentTimeMillis(); System.out.println("耗时多长时间:"+(endTime-startTime)+"ms"); System.out.println("任务处理结束"); }
进行SpringBatch的注入方法有xml文件注入bean,在这里选择java注入
@Configuration @EnableBatchProcessing//开启批处理 publicclassCsvBatchConfig{ /**1首先我们通过FlatFileItemReader读取我们需要的文件通过setResource来实现 *2设置map在这里通过先设置解析器setLineTokenizer来解析我们csv文件中的数据 *3setFieldSetMapper将我们需要的数据转化为我们的实体对象存储 *4如果想跳过前面的几行需要使用setLinesToSkip就可以实现 */ @Bean publicItemReaderreader()throwsException{ FlatFileItemReader reader=newFlatFileItemReader ();//1 reader.setResource(newClassPathResource("people.csv"));//2 reader.setLineMapper(newDefaultLineMapper (){{//3 setLineTokenizer(newDelimitedLineTokenizer(){{ setNames(newString[]{"name","age","nation","address"}); }}); setFieldSetMapper(newBeanWrapperFieldSetMapper (){{ setTargetType(Person.class); }}); }}); reader.setLinesToSkip(3); returnreader; } @Bean publicItemProcessor processor(){ CsvItemProcessorprocessor=newCsvItemProcessor();//1 processor.setValidator(csvBeanValidator());//2 returnprocessor; } /** *写入数据到数据库中 *1执行的sql语句2设置数据源 */ @Bean publicItemWriter writer(DataSourcedataSource){//1 JdbcBatchItemWriter writer=newJdbcBatchItemWriter ();//2 writer.setItemSqlParameterSourceProvider(newBeanPropertyItemSqlParameterSourceProvider ()); Stringsql="insertintoperson"+"(id,name,age,nation,address)" +"values(hibernate_sequence.nextval,:name,:age,:nation,:address)"; writer.setSql(sql);//3 writer.setDataSource(dataSource); returnwriter; } //作业的仓库就是设置数据源 @Bean publicJobRepositoryjobRepository(DataSourcedataSource,PlatformTransactionManagertransactionManager) throwsException{ JobRepositoryFactoryBeanjobRepositoryFactoryBean=newJobRepositoryFactoryBean(); jobRepositoryFactoryBean.setDataSource(dataSource); jobRepositoryFactoryBean.setTransactionManager(transactionManager); jobRepositoryFactoryBean.setDatabaseType("mysql"); returnjobRepositoryFactoryBean.getObject(); } //调度器使用它来执行我们的批处理 @Bean publicSimpleJobLauncherjobLauncher(DataSourcedataSource,PlatformTransactionManagertransactionManager) throwsException{ SimpleJobLauncherjobLauncher=newSimpleJobLauncher(); jobLauncher.setJobRepository(jobRepository(dataSource,transactionManager)); returnjobLauncher; } //将监听器加入到job中 @Bean publicJobimportJob(JobBuilderFactoryjobs,Steps1){ returnjobs.get("importJob") .incrementer(newRunIdIncrementer()) .flow(s1)//1 .end() .listener(csvJobListener())//2 .build(); } //步骤绑定reader与writer一次性处理65000条记录 @Bean publicStepstep1(StepBuilderFactorystepBuilderFactory,ItemReader reader,ItemWriter writer, ItemProcessor processor){ returnstepBuilderFactory .get("step1") . chunk(65000)//1 .reader(reader)//2 .processor(processor)//3 .writer(writer)//4 .build(); } @Bean publicCsvJobListenercsvJobListener(){ returnnewCsvJobListener(); } @Bean publicValidator csvBeanValidator(){ returnnewCsvBeanValidator (); } }
在配置文件中启动自动执行批处理
spring.batch.job.names=job1,job2#启动时要执行的Job,默认执行全部Job
spring.batch.job.enabled=true#是否自动执行定义的Job,默认是
spring.batch.initializer.enabled=true#是否初始化SpringBatch的数据库,默认为是
spring.batch.schema=
spring.batch.table-prefix=#设置SpringBatch的数据库表的前缀
项目汇总
从项目中我们可以看到总的步骤就是首先读取我们需要实现的文件进行解析,然后转换成需要的实体类并且绑定到reader中,二实现我们需要的writer并且帮到到数据库上,三实现job监听器将其绑定到步骤中。最后开启批处理自动执行入库即可。这个简单步骤主要是配置中用到的理解流程自己也可以方便实现批处理的流程。
总结
以上所述是小编给大家介绍的SpringBoot和SpringBatch使用,希望对大家有所帮助,如果大家有任何疑问请给我留言,小编会及时回复大家的。在此也非常感谢大家对毛票票网站的支持!