spring boot整合spring-kafka实现发送接收消息实例代码
前言
由于我们的新项目使用的是spring-boot,而又要同步新项目中建的数据到老的系统当中.原来已经有一部分的同步代码,使用的是kafka.其实只是做数据的同步,我觉得选MQ没必要使用kafka.首先数据量不大,其实搞kafka又要搞集群,ZK.只是用做一些简单数据同步的话,有点大材小用.
没办法,咱只是个打工的,领导让搞就搞吧.刚开始的时候发现有一个spring-integration-kafka,描述中说是基于spring-kafka做了一次重写.但是我看了官方文档.实在是搞的有点头大.功能一直没实现.文档写的也不是很漂亮,也可能是刚起步,有很多的问题.我这里只能放弃了,使用了spring-kafka.
实现方法
pom.xml文件如下
4.0.0 org.linuxsogood.sync linuxsogood-sync 1.0.0-SNAPSHOT org.springframework.boot spring-boot-starter-parent 1.4.0.RELEASE 1.8 3.3.1 1.2.4 3.3.6 4.1.1 org.springframework.boot spring-boot-starter-web org.springframework.boot spring-boot-starter-jdbc org.springframework.boot spring-boot-starter-aop org.springframework.boot spring-boot-starter-freemarker org.springframework.kafka spring-kafka 1.1.0.RELEASE junit junit 4.12 test org.assertj assertj-core 3.5.2 org.hamcrest hamcrest-all 1.3 test org.mockito mockito-all 1.9.5 test org.springframework spring-test 4.2.3.RELEASE test org.springframework.boot spring-boot-starter-test test mysql mysql-connector-java com.microsoft.sqlserver sqljdbc4 4.0.0 com.alibaba druid 1.0.11 org.mybatis mybatis ${mybatis.version} org.mybatis mybatis-spring ${mybatis.spring.version} org.mybatis.generator mybatis-generator-core 1.3.2 compile true com.github.pagehelper pagehelper ${pagehelper.version} tk.mybatis mapper ${mapper.version} com.alibaba fastjson 1.2.17 repo.spring.io.milestone SpringFrameworkMavenMilestoneRepository https://repo.spring.io/libs-milestone mybatis_generator org.mybatis.generator mybatis-generator-maven-plugin 1.3.2 true true org.springframework.boot spring-boot-maven-plugin org.linuxsogood.sync.Starter
orm层使用了MyBatis,又使用了通用Mapper和分页插件.
kafka消费端配置
importorg.linuxsogood.sync.listener.Listener; importorg.apache.kafka.clients.consumer.ConsumerConfig; importorg.apache.kafka.common.serialization.StringDeserializer; importorg.springframework.beans.factory.annotation.Value; importorg.springframework.context.annotation.Bean; importorg.springframework.context.annotation.Configuration; importorg.springframework.kafka.annotation.EnableKafka; importorg.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; importorg.springframework.kafka.config.KafkaListenerContainerFactory; importorg.springframework.kafka.core.ConsumerFactory; importorg.springframework.kafka.core.DefaultKafkaConsumerFactory; importorg.springframework.kafka.listener.ConcurrentMessageListenerContainer; importjava.util.HashMap; importjava.util.Map; @Configuration @EnableKafka publicclassKafkaConsumerConfig{ @Value("${kafka.broker.address}") privateStringbrokerAddress; @Bean KafkaListenerContainerFactory>kafkaListenerContainerFactory(){ ConcurrentKafkaListenerContainerFactory factory=newConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(consumerFactory()); factory.setConcurrency(3); factory.getContainerProperties().setPollTimeout(3000); returnfactory; } @Bean publicConsumerFactory consumerFactory(){ returnnewDefaultKafkaConsumerFactory<>(consumerConfigs()); } @Bean publicMap consumerConfigs(){ Map propsMap=newHashMap<>(); propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,this.brokerAddress); propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,false); propsMap.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,"100"); propsMap.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG,"15000"); propsMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class); propsMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class); propsMap.put(ConsumerConfig.GROUP_ID_CONFIG,"firehome-group"); propsMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"earliest"); returnpropsMap; } @Bean publicListenerlistener(){ returnnewListener(); } }
生产者的配置.
importorg.apache.kafka.clients.producer.ProducerConfig; importorg.apache.kafka.common.serialization.StringSerializer; importorg.springframework.beans.factory.annotation.Value; importorg.springframework.context.annotation.Bean; importorg.springframework.context.annotation.Configuration; importorg.springframework.kafka.annotation.EnableKafka; importorg.springframework.kafka.core.DefaultKafkaProducerFactory; importorg.springframework.kafka.core.KafkaTemplate; importorg.springframework.kafka.core.ProducerFactory; importjava.util.HashMap; importjava.util.Map; @Configuration @EnableKafka publicclassKafkaProducerConfig{ @Value("${kafka.broker.address}") privateStringbrokerAddress; @Bean publicProducerFactoryproducerFactory(){ returnnewDefaultKafkaProducerFactory<>(producerConfigs()); } @Bean publicMap producerConfigs(){ Map props=newHashMap<>(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,this.brokerAddress); props.put(ProducerConfig.RETRIES_CONFIG,0); props.put(ProducerConfig.BATCH_SIZE_CONFIG,16384); props.put(ProducerConfig.LINGER_MS_CONFIG,1); props.put(ProducerConfig.BUFFER_MEMORY_CONFIG,33554432); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,StringSerializer.class); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class); returnprops; } @Bean publicKafkaTemplate kafkaTemplate(){ returnnewKafkaTemplate (producerFactory()); } }
监听,监听里面,写的就是业务逻辑了,从kafka里面得到数据后,具体怎么去处理.如果需要开启kafka处理消息的广播模式,多个监听要监听不同的group,即方法上的注解@KafkaListener里的group一定要不一样.如果多个监听里的group写的一样,就会造成只有一个监听能处理其中的消息,另外监听就不能处理消息了.也即是kafka的分布式消息处理方式.
在同一个group里的监听,共同处理接收到的消息,会根据一定的算法来处理.如果不在一个组,但是监听的是同一个topic的话,就会形成广播模式
importcom.alibaba.fastjson.JSON; importorg.linuxsogood.qilian.enums.CupMessageType; importorg.linuxsogood.qilian.kafka.MessageWrapper; importorg.linuxsogood.qilian.model.store.Store; importorg.linuxsogood.sync.mapper.StoreMapper; importorg.linuxsogood.sync.model.StoreExample; importorg.apache.commons.lang3.StringUtils; importorg.apache.kafka.clients.consumer.ConsumerRecord; importorg.slf4j.Logger; importorg.slf4j.LoggerFactory; importorg.springframework.beans.factory.annotation.Autowired; importorg.springframework.kafka.annotation.KafkaListener; importjava.util.List; importjava.util.Optional; publicclassListener{ privatestaticfinalLoggerLOGGER=LoggerFactory.getLogger(Listener.class); @Autowired privateStoreMapperstoreMapper; /** *监听kafka消息,如果有消息则消费,同步数据到新烽火的库 *@paramrecord消息实体bean */ @KafkaListener(topics="linuxsogood-topic",group="sync-group") publicvoidlisten(ConsumerRecord,?>record){ Optional>kafkaMessage=Optional.ofNullable(record.value()); if(kafkaMessage.isPresent()){ Objectmessage=kafkaMessage.get(); try{ MessageWrappermessageWrapper=JSON.parseObject(message.toString(),MessageWrapper.class); CupMessageTypetype=messageWrapper.getType(); //判断消息的数据类型,不同的数据入不同的表 if(CupMessageType.STORE==type){ proceedStore(messageWrapper); } }catch(Exceptione){ LOGGER.error("将接收到的消息保存到数据库时异常,消息:{},异常:{}",message.toString(),e); } } } /** *消息是店铺类型,店铺消息处理入库 *@parammessageWrapper从kafka中得到的消息 */ privatevoidproceedStore(MessageWrappermessageWrapper){ Objectdata=messageWrapper.getData(); StorecupStore=JSON.parseObject(data.toString(),Store.class); StoreExamplestoreExample=newStoreExample(); StringstoreName=StringUtils.isBlank(cupStore.getStoreOldName())?cupStore.getStoreName():cupStore.getStoreOldName(); storeExample.createCriteria().andStoreNameEqualTo(storeName); Liststores=storeMapper.selectByExample(storeExample); org.linuxsogood.sync.model.StoreconvertStore=neworg.linuxsogood.sync.model.Store(); org.linuxsogood.sync.model.Storestore=convertStore.convert(cupStore); //如果查询不到记录则新增 if(stores.size()==0){ storeMapper.insert(store); }else{ store.setStoreId(stores.get(0).getStoreId()); storeMapper.updateByPrimaryKey(store); } } }
总结
以上就是这篇文章的全部内容了,希望本文的内容对大家的学习或者工作能带来一定的帮助,如果有疑问大家可以留言交流,谢谢大家对毛票票的支持。