springboot 1.5.2 集成kafka的简单例子
本文介绍了springboot1.5.2集成kafka的简单例子,分享给大家,具体如下:
随着springboot1.5版本的发布,在spring项目中与kafka集成更为简便。
添加依赖
compile("org.springframework.kafka:spring-kafka:1.1.2.RELEASE")
添加application.properties
#kafka #指定kafka代理地址,可以多个 spring.kafka.bootstrap-servers=192.168.59.130:9092,192.168.59.131:9092,192.168.59.132:9092 #指定默认消费者groupid spring.kafka.consumer.group-id=myGroup #指定默认topicid spring.kafka.template.default-topic=my-replicated-topic #指定listener容器中的线程数,用于提高并发量 spring.kafka.listener.concurrency=3 #每次批量发送消息的数量 spring.kafka.producer.batch-size=1000
configuration启用kafka
packagecn.xiaojf.today.data.kafka.configuration;
importorg.springframework.context.annotation.Configuration;
importorg.springframework.kafka.annotation.EnableKafka;
/**
*kafka配置
*@authorxiaojf2017/3/2414:09
*/
@Configuration
@EnableKafka
publicclassKafkaConfiguration{
}
消息生产者
packagecn.xiaojf.today.data.kafka.producer;
importorg.apache.kafka.clients.producer.Producer;
importorg.apache.kafka.clients.producer.RecordMetadata;
importorg.springframework.beans.factory.annotation.Autowired;
importorg.springframework.kafka.core.KafkaOperations;
importorg.springframework.kafka.core.KafkaTemplate;
importorg.springframework.kafka.support.ProducerListener;
importorg.springframework.stereotype.Component;
/**
*消息生产者
*@authorxiaojf2017/3/2414:36
*/
@Component
publicclassMsgProducer{
@Autowired
privateKafkaTemplatekafkaTemplate;
publicvoidsend(){
kafkaTemplate.send("my-replicated-topic","xiaojf");
kafkaTemplate.send("my-replicated-topic","xiaojf");
kafkaTemplate.metrics();
kafkaTemplate.execute(newKafkaOperations.ProducerCallback(){
@Override
publicObjectdoInKafka(Producerproducer){
//这里可以编写kafka原生的api操作
returnnull;
}
});
//消息发送的监听器,用于回调返回信息
kafkaTemplate.setProducerListener(newProducerListener(){
@Override
publicvoidonSuccess(Stringtopic,Integerpartition,Stringkey,Stringvalue,RecordMetadatarecordMetadata){
}
@Override
publicvoidonError(Stringtopic,Integerpartition,Stringkey,Stringvalue,Exceptionexception){
}
@Override
publicbooleanisInterestedInSuccess(){
returnfalse;
}
});
}
}
消息消费者
packagecn.xiaojf.today.data.kafka.consumer;
importorg.springframework.kafka.annotation.KafkaListener;
importorg.springframework.stereotype.Component;
/**
*消息消费者
*@authorxiaojf2017/3/2414:36
*/
@Component
publicclassMsgConsumer{
@KafkaListener(topics={"my-replicated-topic","my-replicated-topic2"})
publicvoidprocessMessage(Stringcontent){
System.out.println(content);
}
}
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持毛票票。