RocketMq事务消息发送代码流程详解
一、RocketMq事务消息流程:
1、首先会向broker发送一个预请求消息,消费者不可见
2、回调执行本地事务(比如操作数据库)
3、事务执行成功后,再次发送消息给broker,告诉broker事务执行成功这个消息要提交,让消费者可见。如果本地事务执行超时,会返回一个unknow,broker会发送一个消息回查,检查消息是否执行成功。
二、RocketMq事务消息实例:
1、引入rocketMq相关的依赖:
org.apache.rocketmq rocketmq-client 4.4.0
2、创建一个TransactionProducer类:
publicclassTransactionProducer{ publicstaticvoidmain(String[]args)throwsMQClientException,RemotingException,InterruptedException,MQBrokerException,UnsupportedEncodingException{ //创建生产者并制定组名 TransactionMQProducerproducer=newTransactionMQProducer("rocketMQ_transaction_producer_group"); //2.指定Nameserver地址 producer.setNamesrvAddr("192.168.***.***:9876"); //3、指定消息监听对象用于执行本地事务和消息回查 TransactionListenerlistener=newTransactionListenerImol(); producer.setTransactionListener(listener); //4、线程池 ExecutorServiceexecutorService=newThreadPoolExecutor(2,5,100,TimeUnit.SECONDS,newArrayBlockingQueue(2000),newThreadFactory(){ @Override publicThreadnewThread(Runnabler){ Threadthread=newThread(r); thread.setName("client-tanscation-msg-check-thread"); returnthread; } }); producer.setExecutorService(executorService); //5、启动producer producer.start(); //6.创建消息对象,指定主题Topic、Tag和消息体Stringtopic,Stringtags,Stringkeys,byte[]body Messagemessage=newMessage("Topic_transaction_demo",//主题 "Tags",//主要用于消息过滤 "Key_1",//消息唯一值 ("hello-transaction").getBytes(RemotingHelper.DEFAULT_CHARSET)); //7、发送事务消息 TransactionSendResultresult=producer.sendMessageInTransaction(message,"hello-transaction"); producer.shutdown(); } }
3、发送事务消息还需要一个事务监听对象,它实现TransactionListener接口,其中有两个方法作用分别是执行本地事务和消息回查:
publicclassTransactionListenerImolimplementsTransactionListener{ //存储事务状态信息key:事务idvalue:当前事务执行的状态 privateConcurrentHashMaplocalTrans=newConcurrentHashMap<>(); //执行本地事务 @Override publicLocalTransactionStateexecuteLocalTransaction(Messagemessage,Objecto){ //事务id StringtransactionId=message.getTransactionId(); //0:执行中,状态未知1:执行成功2:执行失败 localTrans.put(transactionId,0); //业务执行,本地事务,service System.out.println("hello-demo-transaction"); try{ System.out.println("正在执行本地事务---"); Thread.sleep(60000*2); System.out.println("本地事务执行成功---"); localTrans.put(transactionId,1); }catch(InterruptedExceptione){ e.printStackTrace(); localTrans.put(transactionId,2); returnLocalTransactionState.ROLLBACK_MESSAGE; } returnLocalTransactionState.COMMIT_MESSAGE; } //消息回查 @Override publicLocalTransactionStatecheckLocalTransaction(MessageExtmessageExt){ //获取对应事务的状态信息 StringtransactionId=messageExt.getTransactionId(); //获取对应事务id执行状态 Integerstatus=localTrans.get(transactionId); //消息回查 System.out.println("消息回查---transactionId:"+transactionId+"状态:"+status); switch(status){ case0: returnLocalTransactionState.UNKNOW; case1: returnLocalTransactionState.COMMIT_MESSAGE; case2: returnLocalTransactionState.ROLLBACK_MESSAGE; } returnLocalTransactionState.UNKNOW; } }
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持毛票票。
声明:本文内容来源于网络,版权归原作者所有,内容由互联网用户自发贡献自行上传,本网站不拥有所有权,未作人工编辑处理,也不承担相关法律责任。如果您发现有涉嫌版权的内容,欢迎发送邮件至:czq8825#qq.com(发邮件时,请将#更换为@)进行举报,并提供相关证据,一经查实,本站将立刻删除涉嫌侵权内容。