kafka与Spring的集成
本文内容纲要:
-准备工作
-配置文件
-pom文件配置(也可以直接下载jar包)
-producer配置
-consumer配置
-applicationContext配置
-具体实现
-具体项目代码
准备工作
kafka版本:kafka_2.10-0.10.1.0
spring版本:spring4.3
配置文件
pom文件配置(也可以直接下载jar包)
Kafka和spring集成的支持类库,spring和kafka通信监听
1<dependency>
2<groupId>org.springframework.integration</groupId>
3<artifactId>spring-integration-kafka</artifactId>
4<version>1.3.0.RELEASE</version>
5</dependency>
kafka发送消息以及接受消息使用的类库
1<dependency>
2<groupId>org.apache.kafka</groupId>
3<artifactId>kafka-clients</artifactId>
4<version>0.10.1.0</version>
5</dependency>
使用高版本是因为低版本无法支持kafka监听,spring和kafka集成不好
1<dependency>
2<groupId>org.springframework</groupId>
3<artifactId>spring-webmvc</artifactId>
4<version>4.3.0.RELEASE</version>
5</dependency>
kafka自带监听器,依赖于spring,所以需要和pring-integration-kafka结合使用
1<dependency>
2<groupId>org.springframework.kafka</groupId>
3<artifactId>spring-kafka</artifactId>
4<version>1.0.0.RC1</version>
5</dependency>
producer配置
1.如果你的topic没有设置名称,按照默认的topic的名字生成对应的数据文件夹。
2.producerListener用来判断kafka发送数据是否成功以及发送反馈信息。
1<?xmlversion="1.0"encoding="UTF-8"?>
2<beansxmlns="http://www.springframework.org/schema/beans"
3xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xmlns:context="http://www.springframework.org/schema/context"
4xsi:schemaLocation="http://www.springframework.org/schema/beans
5http://www.springframework.org/schema/beans/spring-beans.xsd
6http://www.springframework.org/schema/context
7http://www.springframework.org/schema/context/spring-context.xsd">
8
9<!--定义producer的参数-->
10<beanid="producerProperties"class="java.util.HashMap">
11<constructor-arg>
12<map>
13<entrykey="bootstrap.servers"value="localhost:7000"/>
14<entrykey="group.id"value="0"/>
15<entrykey="retries"value="1"/>
16<entrykey="batch.size"value="16384"/>
17<entrykey="linger.ms"value="1"/>
18<entrykey="buffer.memory"value="33554432"/>
19<entrykey="key.serializer"
20value="org.apache.kafka.common.serialization.StringSerializer"/>
21<entrykey="value.serializer"
22value="org.apache.kafka.common.serialization.StringSerializer"/>
23</map>
24</constructor-arg>
25</bean>
26
27<!--创建kafkatemplate需要使用的producerfactorybean-->
28<beanid="producerFactory"
29class="org.springframework.kafka.core.DefaultKafkaProducerFactory">
30<constructor-arg>
31<refbean="producerProperties"/>
32</constructor-arg>
33</bean>
34
35<!--创建kafkatemplatebean,使用的时候,只需要注入这个bean,即可使用template的send消息方法-->
36<beanid="KafkaTemplate"class="org.springframework.kafka.core.KafkaTemplate">
37<constructor-argref="producerFactory"/>
38<constructor-argname="autoFlush"value="true"/>
39<propertyname="defaultTopic"value="defaultTopic"/>
40<propertyname="producerListener"ref="producerListener"/>
41</bean>
42
43<beanid="producerListener"class="com.git.kafka.producer.KafkaProducerListener"/>
44</beans>
consumer配置
1.使用kafka的listener进行消息消费监听,如果有消费消息进入会自动调用OnMessage方法进行消息消费以及后续业务处理。
2.如果要配置多个topic,需要创建新的消费者容器,然后统一指向listner的消息处理类,统一让这个类进行后续业务处理。
1<?xmlversion="1.0"encoding="UTF-8"?>
2<beansxmlns="http://www.springframework.org/schema/beans"
3xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
4xmlns:context="http://www.springframework.org/schema/context"
5xsi:schemaLocation="http://www.springframework.org/schema/beans
6http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
7http://www.springframework.org/schema/tx
8http://www.springframework.org/schema/tx/spring-tx-3.0.xsd
9http://www.springframework.org/schema/jee
10http://www.springframework.org/schema/jee/spring-jee-3.0.xsd
11http://www.springframework.org/schema/context
12http://www.springframework.org/schema/context/spring-context-3.0.xsd">
13
14
15<!--定义consumer的参数-->
16<beanid="consumerProperties"class="java.util.HashMap">
17<constructor-arg>
18<map>
19<entrykey="bootstrap.servers"value="127.0.0.1:7000"/>
20<entrykey="group.id"value="0"/>
21<entrykey="enable.auto.commit"value="false"/>
22<entrykey="auto.commit.interval.ms"value="1000"/>
23<entrykey="session.timeout.ms"value="15000"/>
24<entrykey="key.deserializer"value="org.apache.kafka.common.serialization.StringDeserializer"/>
25<entrykey="value.deserializer"value="org.apache.kafka.common.serialization.StringDeserializer"/>
26</map>
27</constructor-arg>
28</bean>
29
30<!--创建consumerFactorybean-->
31<beanid="consumerFactory"class="org.springframework.kafka.core.DefaultKafkaConsumerFactory">
32<constructor-arg>
33<refbean="consumerProperties"/>
34</constructor-arg>
35</bean>
36
37<!--实际执行消息消费的类-->
38<beanid="messageListernerConsumerService"class="com.git.kafka.consumer.KafkaConsumerServer"/>
39
40<!--消费者容器配置信息-->
41<beanid="containerProperties_trade"class="org.springframework.kafka.listener.config.ContainerProperties">
42<constructor-argvalue="order_test_topic"/>
43<propertyname="messageListener"ref="messageListernerConsumerService"/>
44</bean>
45<beanid="containerProperties_other"class="org.springframework.kafka.listener.config.ContainerProperties">
46<constructor-argvalue="other_test_topic"/>
47<propertyname="messageListener"ref="messageListernerConsumerService"/>
48</bean>
49
50<!--创建messageListenerContainerbean,使用的时候,只需要注入这个bean-->
51<beanid="messageListenerContainer_trade"class="org.springframework.kafka.listener.KafkaMessageListenerContainer"
52init-method="doStart">
53<constructor-argref="consumerFactory"/>
54<constructor-argref="containerProperties_trade"/>
55</bean>
56
57<beanid="messageListenerContainer_other"class="org.springframework.kafka.listener.KafkaMessageListenerContainer"
58init-method="doStart">
59<constructor-argref="consumerFactory"/>
60<constructor-argref="containerProperties_other"/>
61</bean>
62
63</beans>
applicationContext配置
1<importresource="classpath:kafkaConsumer.xml"/>
2<importresource="classpath:kafkaProducer.xml"/>
具体实现
constant.java//常量类
1packagecom.git.kafka.constant;
2
3/**
4*kafkaMessageConstant
5*@authorwangb
6*
7*/
8publicclassKafkaMesConstant{
9
10publicstaticfinalStringSUCCESS_CODE="00000";
11publicstaticfinalStringSUCCESS_MES="成功";
12
13/*kakfa-code*/
14publicstaticfinalStringKAFKA_SEND_ERROR_CODE="30001";
15publicstaticfinalStringKAFKA_NO_RESULT_CODE="30002";
16publicstaticfinalStringKAFKA_NO_OFFSET_CODE="30003";
17
18/*kakfa-mes*/
19publicstaticfinalStringKAFKA_SEND_ERROR_MES="发送消息超时,联系相关技术人员";
20publicstaticfinalStringKAFKA_NO_RESULT_MES="未查询到返回结果,联系相关技术人员";
21publicstaticfinalStringKAFKA_NO_OFFSET_MES="未查到返回数据的offset,联系相关技术人员";
22
23
24}
KafkaConsumerServer.java//消费者监听
1packagecom.git.kafka.consumer;
2
3importorg.apache.kafka.clients.consumer.ConsumerRecord;
4importorg.slf4j.Logger;
5importorg.slf4j.LoggerFactory;
6importorg.springframework.kafka.listener.MessageListener;
7
8/**
9*kafka监听器启动
10*自动监听是否有消息需要消费
11*@authorwangb
12*
13*/
14publicclassKafkaConsumerServerimplementsMessageListener<String,String>{
15protectedfinalLoggerLOG=LoggerFactory.getLogger("kafkaConsumer");
16/**
17*监听器自动执行该方法
18*消费消息
19*自动提交offset
20*执行业务代码
21*(highlevelapi不提供offset管理,不能指定offset进行消费)
22*/
23@Override
24publicvoidonMessage(ConsumerRecord<String,String>record){
25LOG.info("=============kafkaConsumer开始消费=============");
26Stringtopic=record.topic();
27Stringkey=record.key();
28Stringvalue=record.value();
29longoffset=record.offset();
30intpartition=record.partition();
31LOG.info("-------------topic:"+topic);
32LOG.info("-------------value:"+value);
33LOG.info("-------------key:"+key);
34LOG.info("-------------offset:"+offset);
35LOG.info("-------------partition:"+partition);
36LOG.info("~~~~~~~~~~~~~kafkaConsumer消费结束~~~~~~~~~~~~~");
37}
38
39}
kafkaProducerListener.java//生产者监听-打印日志
packagecom.git.kafka.producer;
importorg.apache.kafka.clients.producer.RecordMetadata;
importorg.slf4j.Logger;
importorg.slf4j.LoggerFactory;
importorg.springframework.kafka.support.ProducerListener;
/**
*kafkaProducer监听器,在producer配置文件中开启
*@authorwangb
*
*/
@SuppressWarnings("rawtypes")
publicclassKafkaProducerListenerimplementsProducerListener{
protectedfinalLoggerLOG=LoggerFactory.getLogger("kafkaProducer");
/**
*发送消息成功后调用
*/
@Override
publicvoidonSuccess(Stringtopic,Integerpartition,Objectkey,
Objectvalue,RecordMetadatarecordMetadata){
LOG.info("==========kafka发送数据成功(日志开始)==========");
LOG.info("----------topic:"+topic);
LOG.info("----------partition:"+partition);
LOG.info("----------key:"+key);
LOG.info("----------value:"+value);
LOG.info("----------RecordMetadata:"+recordMetadata);
LOG.info("~~~~~~~~~~kafka发送数据成功(日志结束)~~~~~~~~~~");
}
/**
*发送消息错误后调用
*/
@Override
publicvoidonError(Stringtopic,Integerpartition,Objectkey,
Objectvalue,Exceptionexception){
LOG.info("==========kafka发送数据错误(日志开始)==========");
LOG.info("----------topic:"+topic);
LOG.info("----------partition:"+partition);
LOG.info("----------key:"+key);
LOG.info("----------value:"+value);
LOG.info("----------Exception:"+exception);
LOG.info("~~~~~~~~~~kafka发送数据错误(日志结束)~~~~~~~~~~");
exception.printStackTrace();
}
/**
*方法返回值代表是否启动kafkaProducer监听器
*/
@Override
publicbooleanisInterestedInSuccess(){
LOG.info("///kafkaProducer监听器启动///");
returntrue;
}
}
KafkaProducerServer.java//生产者
packagecom.git.kafka.producer;
importjava.util.HashMap;
importjava.util.Map;
importjava.util.Random;
importjava.util.concurrent.ExecutionException;
importorg.springframework.beans.factory.annotation.Autowired;
importorg.springframework.kafka.core.KafkaTemplate;
importorg.springframework.kafka.support.SendResult;
importorg.springframework.stereotype.Component;
importorg.springframework.util.concurrent.ListenableFuture;
importcom.alibaba.fastjson.JSON;
importcom.git.kafka.constant.KafkaMesConstant;
/**
*kafkaProducer模板
*使用此模板发送消息
*@authorwangb
*
*/
@Component
publicclassKafkaProducerServer{
@Autowired
privateKafkaTemplate<String,String>kafkaTemplate;
/**
*kafka发送消息模板
*@paramtopic主题
*@paramvaluemessageValue
*@paramifPartition是否使用分区0是\1不是
*@parampartitionNum分区数如果是否使用分区为0,分区数必须大于0
*@paramrole角色:bbcapperp...
*/
publicMap<String,Object>sndMesForTemplate(Stringtopic,Objectvalue,StringifPartition,
IntegerpartitionNum,Stringrole){
Stringkey=role+"-"+value.hashCode();
StringvalueString=JSON.toJSONString(value);
if(ifPartition.equals("0")){
//表示使用分区
intpartitionIndex=getPartitionIndex(key,partitionNum);
ListenableFuture<SendResult<String,String>>result=kafkaTemplate.send(topic,partitionIndex,key,valueString);
Map<String,Object>res=checkProRecord(result);
returnres;
}else{
ListenableFuture<SendResult<String,String>>result=kafkaTemplate.send(topic,key,valueString);
Map<String,Object>res=checkProRecord(result);
returnres;
}
}
/**
*根据key值获取分区索引
*@paramkey
*@parampartitionNum
*@return
*/
privateintgetPartitionIndex(Stringkey,intpartitionNum){
if(key==null){
Randomrandom=newRandom();
returnrandom.nextInt(partitionNum);
}
else{
intresult=Math.abs(key.hashCode())%partitionNum;
returnresult;
}
}
/**
*检查发送返回结果record
*@paramres
*@return
*/
@SuppressWarnings("rawtypes")
privateMap<String,Object>checkProRecord(ListenableFuture<SendResult<String,String>>res){
Map<String,Object>m=newHashMap<String,Object>();
if(res!=null){
try{
SendResultr=res.get();//检查result结果集
/*检查recordMetadata的offset数据,不检查producerRecord*/
LongoffsetIndex=r.getRecordMetadata().offset();
if(offsetIndex!=null&&offsetIndex>=0){
m.put("code",KafkaMesConstant.SUCCESS_CODE);
m.put("message",KafkaMesConstant.SUCCESS_MES);
returnm;
}else{
m.put("code",KafkaMesConstant.KAFKA_NO_OFFSET_CODE);
m.put("message",KafkaMesConstant.KAFKA_NO_OFFSET_MES);
returnm;
}
}catch(InterruptedExceptione){
e.printStackTrace();
m.put("code",KafkaMesConstant.KAFKA_SEND_ERROR_CODE);
m.put("message",KafkaMesConstant.KAFKA_SEND_ERROR_MES);
returnm;
}catch(ExecutionExceptione){
e.printStackTrace();
m.put("code",KafkaMesConstant.KAFKA_SEND_ERROR_CODE);
m.put("message",KafkaMesConstant.KAFKA_SEND_ERROR_MES);
returnm;
}
}else{
m.put("code",KafkaMesConstant.KAFKA_NO_RESULT_CODE);
m.put("message",KafkaMesConstant.KAFKA_NO_RESULT_MES);
returnm;
}
}
}
KafkaProducerTest.java//kafka生产者测试(消费者使用spring启动监听,自动执行onMessage方法)
packagecom.git.test;
importjava.util.Map;
importcom.git.kafka.producer.KafkaProducerServer;
publicclassKafkaProducerTest{
publicstaticvoidmain(String[]args){
KafkaProducerServerkafkaProducer=newKafkaProducerServer();
Stringtopic="orderTopic";
Stringvalue="test";
StringifPartition="0";
IntegerpartitionNum=3;
Stringrole="test";//用来生成key
Map<String,Object>res=kafkaProducer.sndMesForTemplate
(topic,value,ifPartition,partitionNum,role);
System.out.println("测试结果如下:===============");
Stringmessage=(String)res.get("message");
Stringcode=(String)res.get("code");
System.out.println("code:"+code);
System.out.println("message:"+message);
}
}
具体项目代码
项目地址:https://git.oschina.net/wsmd/kafka-0.10-demo
本文内容总结:准备工作,配置文件,pom文件配置(也可以直接下载jar包),producer配置,consumer配置,applicationContext配置,具体实现,具体项目代码,
原文链接:https://www.cnblogs.com/wangb0402/p/6187796.html