PHP基于rabbitmq操作类的生产者和消费者功能示例
本文实例讲述了PHP基于rabbitmq操作类的生产者和消费者功能。分享给大家供大家参考,具体如下:
注意事项:
1、accept.php消费者代码需要在命令行执行
2、'username'=>'asdf','password'=>'123456'改成自己的帐号和密码
RabbitMQCommand.php操作类代码
$host,'port'=>5672,'username'=>$username,'password'=>$password,'vhost'=>'/')
*/
publicfunction__construct($configs=array(),$exchange_name='',$queue_name='',$route_key=''){
$this->setConfigs($configs);
$this->exchange_name=$exchange_name;
$this->queue_name=$queue_name;
$this->route_key=$route_key;
}
privatefunctionsetConfigs($configs){
if(!is_array($configs)){
thrownewException('configsisnotarray');
}
if(!($configs['host']&&$configs['port']&&$configs['username']&&$configs['password'])){
thrownewException('configsisempty');
}
if(empty($configs['vhost'])){
$configs['vhost']='/';
}
$configs['login']=$configs['username'];
unset($configs['username']);
$this->configs=$configs;
}
/*
*设置是否持久化,默认为True
*/
publicfunctionsetDurable($durable){
$this->durable=$durable;
}
/*
*设置是否自动删除
*/
publicfunctionsetAutoDelete($autodelete){
$this->autodelete=$autodelete;
}
/*
*设置是否镜像
*/
publicfunctionsetMirror($mirror){
$this->mirror=$mirror;
}
/*
*打开amqp连接
*/
privatefunctionopen(){
if(!$this->_conn){
try{
$this->_conn=newAMQPConnection($this->configs);
$this->_conn->connect();
$this->initConnection();
}catch(AMQPConnectionException$ex){
thrownewException('cannotconnectionrabbitmq',500);
}
}
}
/*
*rabbitmq连接不变
*重置交换机,队列,路由等配置
*/
publicfunctionreset($exchange_name,$queue_name,$route_key){
$this->exchange_name=$exchange_name;
$this->queue_name=$queue_name;
$this->route_key=$route_key;
$this->initConnection();
}
/*
*初始化rabbit连接的相关配置
*/
privatefunctioninitConnection(){
if(empty($this->exchange_name)||empty($this->queue_name)||empty($this->route_key)){
thrownewException('rabbitmqexchange_nameorqueue_nameorroute_keyisempty',500);
}
$this->_channel=newAMQPChannel($this->_conn);
$this->_exchange=newAMQPExchange($this->_channel);
$this->_exchange->setName($this->exchange_name);
$this->_exchange->setType(AMQP_EX_TYPE_DIRECT);
if($this->durable)
$this->_exchange->setFlags(AMQP_DURABLE);
if($this->autodelete)
$this->_exchange->setFlags(AMQP_AUTODELETE);
$this->_exchange->declare();
$this->_queue=newAMQPQueue($this->_channel);
$this->_queue->setName($this->queue_name);
if($this->durable)
$this->_queue->setFlags(AMQP_DURABLE);
if($this->autodelete)
$this->_queue->setFlags(AMQP_AUTODELETE);
if($this->mirror)
$this->_queue->setArgument('x-ha-policy','all');
$this->_queue->declare();
$this->_queue->bind($this->exchange_name,$this->route_key);
}
publicfunctionclose(){
if($this->_conn){
$this->_conn->disconnect();
}
}
publicfunction__sleep(){
$this->close();
returnarray_keys(get_object_vars($this));
}
publicfunction__destruct(){
$this->close();
}
/*
*生产者发送消息
*/
publicfunctionsend($msg){
$this->open();
if(is_array($msg)){
$msg=json_encode($msg);
}else{
$msg=trim(strval($msg));
}
return$this->_exchange->publish($msg,$this->route_key);
}
/*
*消费者
*$fun_name=array($classobj,$function)orfunctionnamestring
*$autoack是否自动应答
*
*functionprocessMessage($envelope,$queue){
$msg=$envelope->getBody();
echo$msg."\n";//处理消息
$queue->ack($envelope->getDeliveryTag());//手动应答
}
*/
publicfunctionrun($fun_name,$autoack=True){
$this->open();
if(!$fun_name||!$this->_queue)returnFalse;
while(True){
if($autoack)$this->_queue->consume($fun_name,AMQP_AUTOACK);
else$this->_queue->consume($fun_name);
}
}
}
send.php生产者代码
'127.0.0.1','port'=>5672,'username'=>'asdf','password'=>'123456','vhost'=>'/');
$exchange_name='class-e-1';
$queue_name='class-q-1';
$route_key='class-r-1';
$ra=newRabbitMQCommand($configs,$exchange_name,$queue_name,$route_key);
for($i=0;$i<=100;$i++){
$ra->send(date('Y-m-dH:i:s',time()));
}
exit();
accept.php消费者代码
'127.0.0.1','port'=>5672,'username'=>'asdf','password'=>'123456','vhost'=>'/');
$exchange_name='class-e-1';
$queue_name='class-q-1';
$route_key='class-r-1';
$ra=newRabbitMQCommand($configs,$exchange_name,$queue_name,$route_key);
classA{
functionprocessMessage($envelope,$queue){
$msg=$envelope->getBody();
$envelopeID=$envelope->getDeliveryTag();
$pid=posix_getpid();
file_put_contents("log{$pid}.log",$msg.'|'.$envelopeID.''."\r\n",FILE_APPEND);
$queue->ack($envelopeID);
}
}
$a=newA();
$s=$ra->run(array($a,'processMessage'),false);
更多关于PHP相关内容感兴趣的读者可查看本站专题:《PHP数据结构与算法教程》、《php程序设计算法总结》、《php字符串(string)用法总结》、《PHP数组(Array)操作技巧大全》、《PHP常用遍历算法与技巧总结》及《PHP数学运算技巧总结》
希望本文所述对大家PHP程序设计有所帮助。
声明:本文内容来源于网络,版权归原作者所有,内容由互联网用户自发贡献自行上传,本网站不拥有所有权,未作人工编辑处理,也不承担相关法律责任。如果您发现有涉嫌版权的内容,欢迎发送邮件至:czq8825#qq.com(发邮件时,请将#更换为@)进行举报,并提供相关证据,一经查实,本站将立刻删除涉嫌侵权内容。