RocketMQ获取指定消息的实现方法(源码)
概要
消息查询是什么?
消息查询就是根据用户提供的msgId从MQ中取出该消息
RocketMQ如果有多个节点如何查询?
问题:RocketMQ分布式结构中,数据分散在各个节点,即便是同一Topic的数据,也未必都在一个broker上。客户端怎么知道数据该去哪个节点上查?
猜想1:逐个访问broker节点查询数据
猜想2:有某种数据中心存在,该中心知道所有消息存储的位置,只要向该中心查询即可得到消息具体位置,进而取得消息内容
实际:
1.消息Id中含有消息所在的broker的地址信息(IP\Port)以及该消息在CommitLog中的偏移量。
2.客户端实现会从msgId字符串中解析出broker地址,向指定broker节查询消息。
问题:CommitLog文件有多个,只有偏移量估计不能确定在哪个文件吧?
实际:单个Broker节点内offset是全局唯一的,不是每个CommitLog文件的偏移量都是从0开始的。单个节点内所有CommitLog文件共用一套偏移量,每个文件的文件名为其第一个消息的偏移量。所以可以根据偏移量和文件名确定CommitLog文件。
源码阅读
0.使用方式
MessageExt msg=consumer.viewMessage(msgId);
1.消息ID解析
这个了解下就可以了
publicclassMessageId{
privateSocketAddressaddress;
privatelongoffset;
publicMessageId(SocketAddressaddress,longoffset){
this.address=address;
this.offset=offset;
}
//get-set
}
//fromMQAdminImpl.java
publicMessageExtviewMessage(
StringmsgId)throwsRemotingException,MQBrokerException,InterruptedException,MQClientException{
MessageIdmessageId=null;
try{
//从msgId字符串中解析出address和offset
//address=ip:port
//offset为消息在CommitLog文件中的偏移量
messageId=MessageDecoder.decodeMessageId(msgId);
}catch(Exceptione){
thrownewMQClientException(ResponseCode.NO_MESSAGE,"querymessagebyidfinished,butnomessage.");
}
returnthis.mQClientFactory.getMQClientAPIImpl().viewMessage(RemotingUtil.socketAddress2String(messageId.getAddress()),
messageId.getOffset(),timeoutMillis);
}
//fromMessageDecoder.java
publicstaticMessageIddecodeMessageId(finalStringmsgId)throwsUnknownHostException{
SocketAddressaddress;
longoffset;
//ipv4和ipv6的区别
//如果msgId总长度超过32字符,则为ipv6
intipLength=msgId.length()==32?4*2:16*2;
byte[]ip=UtilAll.string2bytes(msgId.substring(0,ipLength));
byte[]port=UtilAll.string2bytes(msgId.substring(ipLength,ipLength+8));
ByteBufferbb=ByteBuffer.wrap(port);
intportInt=bb.getInt(0);
address=newInetSocketAddress(InetAddress.getByAddress(ip),portInt);
//offset
byte[]data=UtilAll.string2bytes(msgId.substring(ipLength+8,ipLength+8+16));
bb=ByteBuffer.wrap(data);
offset=bb.getLong(0);
returnnewMessageId(address,offset);
}
2.长连接客户端RPC实现
要发请求首先得先建立连接,这里方法可以看到创建连接相关的操作。值得注意的是,第一次访问的时候可能连接还没建立,建立连接需要消耗一段时间。代码中对这个时间也做了判断,如果连接建立完成后,发现已经超时,则不再发出请求。目的应该是尽可能减少请求线程的阻塞时间。
//fromNettyRemotingClient.java
@Override
publicRemotingCommandinvokeSync(Stringaddr,finalRemotingCommandrequest,longtimeoutMillis)
throwsInterruptedException,RemotingConnectException,RemotingSendRequestException,RemotingTimeoutException{
longbeginStartTime=System.currentTimeMillis();
//这里会先检查有无该地址的通道,有则返回,无则创建
finalChannelchannel=this.getAndCreateChannel(addr);
if(channel!=null&&channel.isActive()){
try{
//前置钩子
doBeforeRpcHooks(addr,request);
//判断通道建立完成时是否已到达超时时间,如果超时直接抛出异常。不发请求
longcostTime=System.currentTimeMillis()-beginStartTime;
if(timeoutMillis
下一步看看它的同步调用做了什么处理。注意到它会构建一个Future对象加入待响应池,发出请求报文后就挂起线程,然后等待唤醒(waitResponse内部使用CountDownLatch等待)。
//fromNettyRemotingAbstract.javapublicRemotingCommandinvokeSyncImpl(finalChannelchannel,finalRemotingCommandrequest,
finallongtimeoutMillis)
throwsInterruptedException,RemotingSendRequestException,RemotingTimeoutException{
//请求id
finalintopaque=request.getOpaque();
try{
//请求存根
finalResponseFutureresponseFuture=newResponseFuture(channel,opaque,timeoutMillis,null,null);
//加入待响应的请求池
this.responseTable.put(opaque,responseFuture);
finalSocketAddressaddr=channel.remoteAddress();
//将请求发出,成功发出时更新状态
channel.writeAndFlush(request).addListener(newChannelFutureListener(){
@Override
publicvoidoperationComplete(ChannelFuturef)throwsException{
if(f.isSuccess()){//若成功发出,更新请求状态为“已发出”
responseFuture.setSendRequestOK(true);
return;
}else{
responseFuture.setSendRequestOK(false);
}
//若发出失败,则从池中移除(没用了,释放资源)
responseTable.remove(opaque);
responseFuture.setCause(f.cause());
//putResponse的时候会唤醒等待的线程
responseFuture.putResponse(null);
log.warn("sendarequestcommandtochannel<"+addr+">failed.");
}
});
//只等待一段时间,不会一直等下去
//若正常响应,则收到响应后,此线程会被唤醒,继续执行下去
//若超时,则到达该时间后线程苏醒,继续执行
RemotingCommandresponseCommand=responseFuture.waitResponse(timeoutMillis);
if(null==responseCommand){
if(responseFuture.isSendRequestOK()){
thrownewRemotingTimeoutException(RemotingHelper.parseSocketAddressAddr(addr),timeoutMillis,
responseFuture.getCause());
}else{
thrownewRemotingSendRequestException(RemotingHelper.parseSocketAddressAddr(addr),responseFuture.getCause());
}
}
returnresponseCommand;
}finally{
//正常响应完成时,将future释放(正常逻辑)
//超时时,将future释放。这个请求已经作废了,后面如果再收到响应,就可以直接丢弃了(由于找不到相关的响应钩子,就不处理了)
this.responseTable.remove(opaque);
}
}
好,我们再来看看收到报文的时候是怎么处理的。我们都了解JDK中的Future的原理,大概就是将这个任务提交给其他线程处理,该线程处理完毕后会将结果写入到Future对象中,写入时如果有线程在等待该结果,则唤醒这些线程。这里也差不多,只不过执行线程在服务端,服务执行完毕后会将结果通过长连接发送给客户端,客户端收到后根据报文中的ID信息从待响应池中找到Future对象,然后就是类似的处理了。
classNettyClientHandlerextendsSimpleChannelInboundHandler{
//底层解码完毕得到RemotingCommand的报文
@Override
protectedvoidchannelRead0(ChannelHandlerContextctx,RemotingCommandmsg)throwsException{
processMessageReceived(ctx,msg);
}
}
publicvoidprocessMessageReceived(ChannelHandlerContextctx,RemotingCommandmsg)throwsException{
finalRemotingCommandcmd=msg;
if(cmd!=null){
//判断类型
switch(cmd.getType()){
caseREQUEST_COMMAND:
processRequestCommand(ctx,cmd);
break;
caseRESPONSE_COMMAND:
processResponseCommand(ctx,cmd);
break;
default:
break;
}
}
}
publicvoidprocessResponseCommand(ChannelHandlerContextctx,RemotingCommandcmd){
//取得消息id
finalintopaque=cmd.getOpaque();
//从待响应池中取得对应请求
finalResponseFutureresponseFuture=responseTable.get(opaque);
if(responseFuture!=null){
//将响应值注入到ResponseFuture对象中,等待线程可从这个对象获取结果
responseFuture.setResponseCommand(cmd);
//请求已处理完毕,释放该请求
responseTable.remove(opaque);
//如果有回调函数的话则回调(由当前线程处理)
if(responseFuture.getInvokeCallback()!=null){
executeInvokeCallback(responseFuture);
}else{
//没有的话,则唤醒等待线程(由等待线程做处理)
responseFuture.putResponse(cmd);
responseFuture.release();
}
}else{
log.warn("receiveresponse,butnotmatchedanyrequest,"+RemotingHelper.parseChannelRemoteAddr(ctx.channel()));
log.warn(cmd.toString());
}
}
总结一下,客户端的处理时序大概是这样的:
结构大概是这样的:
3.服务端的处理
//todo服务端待补充CommitLog文件映射相关内容
classNettyServerHandlerextendsSimpleChannelInboundHandler{
@Override
protectedvoidchannelRead0(ChannelHandlerContextctx,RemotingCommandmsg)throwsException{
processMessageReceived(ctx,msg);
}
}
//fromNettyRemotingAbscract.java
publicvoidprocessMessageReceived(ChannelHandlerContextctx,RemotingCommandmsg)throwsException{
finalRemotingCommandcmd=msg;
if(cmd!=null){
switch(cmd.getType()){
caseREQUEST_COMMAND://服务端走这里
processRequestCommand(ctx,cmd);
break;
caseRESPONSE_COMMAND:
processResponseCommand(ctx,cmd);
break;
default:
break;
}
}
}
//fromNettyRemotingAbscract.java
publicvoidprocessRequestCommand(finalChannelHandlerContextctx,finalRemotingCommandcmd){
//查看有无该请求code相关的处理器
finalPairmatched=this.processorTable.get(cmd.getCode());
//如果没有,则使用默认处理器(可能没有默认处理器)
finalPairpair=null==matched?this.defaultRequestProcessor:matched;
finalintopaque=cmd.getOpaque();
if(pair!=null){
Runnablerun=newRunnable(){
@Override
publicvoidrun(){
try{
doBeforeRpcHooks(RemotingHelper.parseChannelRemoteAddr(ctx.channel()),cmd);
finalRemotingResponseCallbackcallback=newRemotingResponseCallback(){
@Override
publicvoidcallback(RemotingCommandresponse){
doAfterRpcHooks(RemotingHelper.parseChannelRemoteAddr(ctx.channel()),cmd,response);
if(!cmd.isOnewayRPC()){
if(response!=null){//不为null,则由本类将响应值写会给请求方
response.setOpaque(opaque);
response.markResponseType();
try{
ctx.writeAndFlush(response);
}catch(Throwablee){
log.error("processrequestover,butresponsefailed",e);
log.error(cmd.toString());
log.error(response.toString());
}
}else{//为null,意味着processor内部已经将响应处理了,这里无需再处理。
}
}
}
};
if(pair.getObject1()instanceofAsyncNettyRequestProcessor){//QueryMessageProcessor为异步处理器
AsyncNettyRequestProcessorprocessor=(AsyncNettyRequestProcessor)pair.getObject1();
processor.asyncProcessRequest(ctx,cmd,callback);
}else{
NettyRequestProcessorprocessor=pair.getObject1();
RemotingCommandresponse=processor.processRequest(ctx,cmd);
doAfterRpcHooks(RemotingHelper.parseChannelRemoteAddr(ctx.channel()),cmd,response);
callback.callback(response);
}
}catch(Throwablee){
log.error("processrequestexception",e);
log.error(cmd.toString());
if(!cmd.isOnewayRPC()){
finalRemotingCommandresponse=RemotingCommand.createResponseCommand(RemotingSysResponseCode.SYSTEM_ERROR,
RemotingHelper.exceptionSimpleDesc(e));
response.setOpaque(opaque);
ctx.writeAndFlush(response);
}
}
}
};
if(pair.getObject1().rejectRequest()){
finalRemotingCommandresponse=RemotingCommand.createResponseCommand(RemotingSysResponseCode.SYSTEM_BUSY,
"[REJECTREQUEST]systembusy,startflowcontrolforawhile");
response.setOpaque(opaque);
ctx.writeAndFlush(response);
return;
}
try{
finalRequestTaskrequestTask=newRequestTask(run,ctx.channel(),cmd);
pair.getObject2().submit(requestTask);
}catch(RejectedExecutionExceptione){
if((System.currentTimeMillis()%10000)==0){
log.warn(RemotingHelper.parseChannelRemoteAddr(ctx.channel())
+",toomanyrequestsandsystemthreadpoolbusy,RejectedExecutionException"
+pair.getObject2().toString()
+"requestcode:"+cmd.getCode());
}
if(!cmd.isOnewayRPC()){
finalRemotingCommandresponse=RemotingCommand.createResponseCommand(RemotingSysResponseCode.SYSTEM_BUSY,
"[OVERLOAD]systembusy,startflowcontrolforawhile");
response.setOpaque(opaque);
ctx.writeAndFlush(response);
}
}
}else{
Stringerror="requesttype"+cmd.getCode()+"notsupported";
finalRemotingCommandresponse=
RemotingCommand.createResponseCommand(RemotingSysResponseCode.REQUEST_CODE_NOT_SUPPORTED,error);
response.setOpaque(opaque);
ctx.writeAndFlush(response);
log.error(RemotingHelper.parseChannelRemoteAddr(ctx.channel())+error);
}
}
//fromQueryMessageProcesor.java
@Override
publicRemotingCommandprocessRequest(ChannelHandlerContextctx,RemotingCommandrequest)
throwsRemotingCommandException{
switch(request.getCode()){
caseRequestCode.QUERY_MESSAGE:
returnthis.queryMessage(ctx,request);
caseRequestCode.VIEW_MESSAGE_BY_ID://通过msgId查询消息
returnthis.viewMessageById(ctx,request);
default:
break;
}
returnnull;
}
publicRemotingCommandviewMessageById(ChannelHandlerContextctx,RemotingCommandrequest)
throwsRemotingCommandException{
finalRemotingCommandresponse=RemotingCommand.createResponseCommand(null);
finalViewMessageRequestHeaderrequestHeader=
(ViewMessageRequestHeader)request.decodeCommandCustomHeader(ViewMessageRequestHeader.class);
response.setOpaque(request.getOpaque());
//getMessagetStore得到当前映射到内存中的CommitLog文件,然后根据偏移量取得数据
finalSelectMappedBufferResultselectMappedBufferResult=
this.brokerController.getMessageStore().selectOneMessageByOffset(requestHeader.getOffset());
if(selectMappedBufferResult!=null){
response.setCode(ResponseCode.SUCCESS);
response.setRemark(null);
//将响应通过socket写回给客户端
try{
//response对象的数据作为header
//消息内容作为body
FileRegionfileRegion=
newOneMessageTransfer(response.encodeHeader(selectMappedBufferResult.getSize()),
selectMappedBufferResult);
ctx.channel().writeAndFlush(fileRegion).addListener(newChannelFutureListener(){
@Override
publicvoidoperationComplete(ChannelFuturefuture)throwsException{
selectMappedBufferResult.release();
if(!future.isSuccess()){
log.error("Transferonemessagefrompagecachefailed,",future.cause());
}
}
});
}catch(Throwablee){
log.error("",e);
selectMappedBufferResult.release();
}
returnnull;//如果有值,则直接写回给请求方。这里返回null是不需要由外层处理响应。
}else{
response.setCode(ResponseCode.SYSTEM_ERROR);
response.setRemark("cannotfindmessagebytheoffset,"+requestHeader.getOffset());
}
returnresponse;
}
总结
到此这篇关于RocketMQ获取指定消息的文章就介绍到这了,更多相关RocketMQ获取指定消息内容请搜索毛票票以前的文章或继续浏览下面的相关文章希望大家以后多多支持毛票票!
声明:本文内容来源于网络,版权归原作者所有,内容由互联网用户自发贡献自行上传,本网站不拥有所有权,未作人工编辑处理,也不承担相关法律责任。如果您发现有涉嫌版权的内容,欢迎发送邮件至:czq8825#qq.com(发邮件时,请将#更换为@)进行举报,并提供相关证据,一经查实,本站将立刻删除涉嫌侵权内容。