PHP消息队列的一些实现
消息队列(Message Queue):把消息按照产生的次序加入队列,而由另外的处理程序/模块将其从队列中取出,并加以处理;从而形成了一个基本的消息队列。使用消息队列可以很好地将任务以异步的方式进行处理,或者进行数据传送和存储等。例如,当你频繁地向数据库中插入数据、频繁地向搜索引擎提交数据,就可采取消息队列来异步插入。另外,还可以将较慢/较复杂的处理逻辑、有并发数量限制的处理逻辑,通过消息队列放在后台处理。
应用场景:短信服务、电子邮件服务、图片处理服务、好友动态推送服务等。总结下来就是,一些耗时的操作,但是并不要求立即获取执行的结果。 (参考文章)
目录:
- 应用场景-短信服务
- 消息队列的实现-redis
- 消息队列的实现-mongodb
- 消息队列的实现-memcached
- redis-mongodb-memcached性能的对比
- 消息队列的实现amqp-rabbitmq
- 消息队列的总结
应用场景-短信服务
一般发送短信,都是调用服务商的接口,大多都是基于HTTP协议的,body的封装可能是js、xml等。在php中,使用socket,大概有file_get_contents
、curl 、 fsockopen 、stream系等。而这个过程是比较耗时的,如果存在大量这样的操作,必然会降低服务器的响应时间。
所以我们可以使用消息队列来解决这个问题。
消息队列的实现-redis
利用redis实现消息队列,是基于redis本身支持的列表数据类型,其本身就是队列的一种的实现,支持入队rPush、出队lPop。
安装redis、php-redis的步骤省略,下面几个实现也省略了,都比较简单。
其关键的代码如下:
class RedisQueue implements IQueue{
private $_conn = null;
private $_connInfo = null;
public function __construct($connInfo){
if(empty($connInfo['key'])){
$connInfo['key'] = 'rmq';
}
$this->_connInfo = $connInfo;
}
public function open(){
if(empty($this->_connInfo)){
return false;
}
if($this->_connInfo['newlink']){
$this->close();
}
if($this->_conn){
return true;
}
$this->_conn = new Redis();
if(empty($this->_conn)){
return false;
}
if($this->_connInfo['persistent']){
if(!$this->_conn->pconnect($this->_connInfo['host'],$this->_connInfo['port'])){
return false;
}
}else{
if(!$this->_conn->connect($this->_connInfo['host'],$this->_connInfo['port'])){
return false;
}
}
return true;
}
public function push($message){
if(!$this->open()){
return false;
}
return $this->_conn->rPush($this->_connInfo['key'],$message);
}
public function pop(){
if(!$this->open()){
return false;
}
return $this->_conn->lPop($this->_connInfo['key']);
}
}
经测试,redis的性能体现得不错。性能数据请看下面。
消息队列的实现-mongodb
mongodb使用的是timestamp+index实现的消息队列。还有一种使用findAndModify实现队列,但是出队并不删除元素,这个随着时间的推移,内存必爆,这里就不推荐了,网上有人做的这种实现。timestamp各种关系性数据库都能实现,这里也不实现mysql等数据库的了.
其关键的代码如下:
class MongodbQueue implements IQueue{
private $_conn;
private $_connInfo;
/**
* 返回当前的毫秒数
* @return number
*/
private function _getTimeStamp(){
return intval(microtime(true)*1000);
}
public function __construct($connInfo){
if(empty($connInfo['db_name'])){
$connInfo['db_name'] = 'mmq';
}
if(empty($connInfo['collection_name'])){
$connInfo['collection_name'] = 'cmmq';
}
$this->_connInfo = $connInfo;
}
public function open(){
if($this->_conn){
return true;
}
if(!class_exists('MongoClient')){
return false;
}
$this->_conn = new MongoClient($this->_connInfo['server'],$this->_connInfo['server_options']);
if(empty($this->_conn)){
return false;
}
//初始化数据库,集合,索引等
$collection = $this->_conn->{$this->_connInfo['db_name']}->{$this->_connInfo['collection_name']};
$collection->createIndex(array('timestamp'=>1));
return true;
}
public function push($message){
if(!$this->open()){
return false;
}
$arr = array(
'timestamp' => $this->_getTimeStamp(),
'message' => $message
);
$collection = $this->_conn->{$this->_connInfo['db_name']}->{$this->_connInfo['collection_name']};
$ret = $collection->insert($arr);
return isset($ret['ok']) ? ($ret['ok'] ? true : false) : ($ret ? true : false);
}
public function pop(){
if(!$this->open()){
return false;
}
$collection = $this->_conn->{$this->_connInfo['db_name']}->{$this->_connInfo['collection_name']};
$cursor = $collection->find()->sort(array('timestamp'=>1))->limit(1);
$ret = $cursor->getNext();
if(empty($ret)){
return false;
}
if($collection->remove(array('_id'=>$ret['_id']),array("justOne"=>true))){
return $ret;
}else{
return false;
}
}
}
关于mongodb的使用这里也不多说了,可以关注一下这篇文章。
这里额外提一下遇到的坑,我在安装的时候,居然安装错了mongodb。 php 扩展mongodb与mongo的区别: mongodb:帮助手册 mongo:帮助手册 说简单点就是提供的接口不同。
mongodb(version 0.6.3)安装中的坑:
apt-get install openssl
apt-get install libssl-dev
apt-get install libssl0.9.8
但是扩展在make的时候报X509未定义,经过分析得知是头文件未包含,但存在。在mongodb下有个php-ssl.h中HAVE_OPENSSL_EXT
宏起着开关作用,注释即可。
消息队列的实现-memcached
memcached版的消息队列是参照这篇文章-memcache消息队列 实现的。该实现是用两个指针,分别指向队首与队尾,然后对队列进行操作。 这不具备事务特性,在多进程下会导致数据混乱的情况。
- 不具备事务特性:设想移动指针成功,但出队失败。
- 不具备原子性:设想只有一个元素,两个进程同时读取指针,发现合法,同时移动指针。但是后出队的会出现没有元素出队而失败的情况。
但是我们还是实现一下,看看其性能怎么样。
其关键的代码如下:
class MemcachedQueue implements IQueue{
private $_conn = null;
private $_connInfo = null;
private $_key = 'mmq';
public function __construct($connInfo){
if($connInfo['key']){
$this->_key = $connInfo['key'];
}
$this->_connInfo = $connInfo;
}
public function open(){
if(empty($this->_connInfo)){
return false;
}
if($this->_conn){
return true;
}
if($this->_connInfo['persistent_id ']){
$this->_conn = new Memcached($this->_connInfo['persistent_id ']);
}else{
$this->_conn = new Memcached();
}
if(empty($this->_conn)){
return false;
}
if($this->_connInfo['servers']){
$this->_conn->addServers($this->_connInfo['servers']);
}else{
$this->_conn->addServer($this->_connInfo['host'],$this->_connInfo['port'],$this->_connInfo['weight'] ? : 0);
}
return true;
}
private function _setCounter( $key, $offset, $time=0 ){
if(!$this->open()){
return false;
}
$val = $this->_conn->get($key);
if( !is_numeric($val) || $val < 0 ){
$ret = $this->_conn->set( $key, 0, $time );
if( !$ret ) return false;
$val = 0;
}
$offset = intval( $offset );
if( $offset > 0 ){
return $this->_conn->increment( $key, $offset );
}elseif( $offset < 0 ){
return $this->_conn->decrement( $key, -$offset );
}
return $val;
}
public function push($message){
if(!$this->open()){
return false;
}
$pushKey = $this->_key.'w';
if(false === ($pushIndex = $this->_setCounter($pushKey, 1))){
return false;
}
$valueKey = $this->_key.$pushIndex;
return $this->_conn->set($valueKey,$message);
}
public function pop(){
if(!$this->open()){
return false;
}
$pushKey = $this->_key.'w';
if(false === ($pushIndex = $this->_setCounter($pushKey, 0))){
return false;
}
$popKey = $this->_key.'r';
if(false === ($popIndex = $this->_setCounter($popKey, 0))){
return false;
}
++$popIndex;
if($pushIndex < $popIndex){
return false;
}
$valueKey = $this->_key.$popIndex;
if(false === ($this->_setCounter($popKey, 1))){
return false;
}
$ret = $this->_conn->get($valueKey);
if(empty($ret)){
return false;
}
$this->_conn->delete($valueKey);
return $ret;
}
}
性能测试发现数据表现还不错!
redis-mongodb-memcached性能的对比
最后我们比较一下三种实现的性能,最后写了一个benchmark的脚本:
error_reporting(~E_WARNING & ~E_NOTICE & E_ALL);
require 'IQueue.php';
require 'MemcachedQueue.php';
require 'MongodbQueue.php';
require 'RedisQueue.php';
require 'MessageQueueProxy.php';
$config = require 'config.php';
$class = $config['driver'];
$mq = new $class($config['driverInfo']);
$mobileMessageObj = new MessageQueueProxy($mq);
$time = 100;
$num = 10000;
$success = 0;
for($j=0;$j<$time;++$j){
$start = microtime(true);
for($i=0;$i<$num;++$i){
$mobile = '1355'.rand(1000000,9999999);
$content = 'this is your phone number: '.$mobile.'.';
$message = array(
'mobile' => $mobile,
'content' => $content
);
if(!$mobileMessageObj->push(serialize($message))){
echo "on index:",($j*$num+$i+1)," push error !";
goto pushout;
}
++$success;
}
$result[] = microtime(true) - $start;
echo intval(($j+1)/$time*100),"% ";
}
pushout:
$request = $num * $time;
echo "\n";
echo "class: $class\n";
echo "push\n";
echo "times: $time num: $num\n";
echo "request: ",$request,"\n";
echo "faild: ",$request-$success,"\n";
echo "max: ",max($result)," s/{$num}times\n";
echo "min: ",min($result)," s/{$num}times\n";
echo "takes: ",$sum = array_sum($result)," s\n";
echo "average: ",$sum/count($result)," s/{$num}times\n";
echo "rqs: ",intval($time*$num/$sum),"\n";
if($success<$request){
echo "push exit\n";
exit();
}
$result = array();
$success = 0;
for($j=0;$j<$time;++$j){
$start = microtime(true);
for($i=0;$i<$num;++$i){
if($mobileMessageObj->pop()===false){
echo "on index:",($j*$num+$i+1)," pop error !";
goto popout;
}
++$success;
}
$result[] = microtime(true) - $start;
echo intval(($j+1)/$time*100),"% ";
}
popout:
echo "\n";
echo "pop\n";
echo "times: $time num: $num\n";
echo "request: ",$request,"\n";
echo "faild: ",$request-$success,"\n";
echo "max: ",max($result)," s/{$num}times\n";
echo "min: ",min($result)," s/{$num}times\n";
echo "takes: ",$sum = array_sum($result)," s\n";
echo "average: ",$sum/count($result)," s/{$num}times\n";
echo "rqs: ",intval($time*$num/$sum),"\n";
其中各项软件的版本如下(都是当前最新版本的):
php: 5.4.40
Linux: 3.13.0-34-generic(buildd@allspice) (gcc version 4.8.2 (Ubuntu 4.8.2-19ubuntu1) )
memory: 4GB
cpu: Intel(R)_Core(TM)_i5@3.00GHz*4
redis: 3.0.1
php-redis: 2.2.7
mongodb: mongodb-linux-x86_64-ubuntu1404-3.0.2
php-mongo: 1.6.7
memcached: 1.4.24
php-memcached: 2.2.0
redis的数据如下:
class: RedisQueue
push
times: 100 num: 10000
request: 1000000
faild: 0
max: 0.37430000305176 s/10000times
min: 0.26855707168579 s/10000times
takes: 32.56348824501 s
average: 0.3256348824501 s/10000times
rqs: 30709
pop
times: 100 num: 10000
request: 1000000
faild: 0
max: 0.28984785079956 s/10000times
min: 0.21943783760071 s/10000times
takes: 26.595669269562 s
average: 0.26595669269562 s/10000times
rqs: 37600
mongodb+index的数据如下:
class: MongodbQueue
push
times: 100 num: 10000
request: 1000000
faild: 0
max: 1.8860659599304 s/10000times
min: 1.038162946701 s/10000times
takes:115.00487303734 s
average: 1.1500487303734 s/10000times
rqs: 8695
pop
times: 100 num: 10000
request: 1000000
faild: 0
max: 2.2940940856934 s/10000times
min: 1.8466329574585 s/10000times
takes: 197.70558190346 s
average: 1.9770558190346 s/10000times
rqs: 5058
mongodb+noindex的数据如下:
class: MongodbQueue
push
times: 100 num: 10000
request: 1000000
faild: 0
max: 1.4391729831696 s/10000times
min: 1.0708310604095 s/10000times
takes: 120.37140989304 s
average: 1.2037140989304 s/10000times
rqs: 8307
//下面根本跑不动了
memcached的数据如下:
class: MemcachedQueue
push
times: 100 num: 10000
request: 1000000
faild: 0
max: 0.83561706542969 s/10000times
min: 0.67476201057434 s/10000times
takes: 76.049747467041 s
average: 0.76049747467041 s/10000times
rqs: 13149
pop
times: 100 num: 10000
request: 1000000
faild: 0
max: 1.3662950992584 s/10000times
min: 1.2345328330994 s/10000times
takes: 129.71623921394 s
average: 1.2971623921394 s/10000times
rqs: 7709
可以看出:
- 可以看出redis在push上是mongodb的3.53倍,在pop上是mongodb的7.43倍.
- mongodb没有索引,pop根本跑不动了。过了10多秒,我进数据库一看,才删除了99条.
- 可以看出redis的push是memcached的2.3倍,redis的pop是memcached的4.5倍.
消息队列的实现amqp-rabbitmq
先吐槽一下rabbitmq的安装,最开始我是在Ubuntu上面安装的,首先是源码编译,不行。然后apt-get,也不行!最后dpkg安装,还是不行!! 没办法,Ubuntu实在搞不动了。
我就换了centos,还是源码编译,php-amqp老是过不了。然后pecl安装,还是不行。
最后发现,这两样都会用到librabbitmq这个库,而报错是没有amqp_tcp_socket.h
这个头文件。
正好我源码编译过rabbitmq-c
源代码,发现刚好会提供这个文件,但是API对不上啊,这个库是0.6.0的,地址是https://github.com/alanxz/rabbitmq-c/releases/tag/v0.6.0,机智的我突然反应过来,很可能是版本不对。但是没有以前版本的链接啊,机智的我又突然反应过来,就是把v0.6.0改成v0.4.0,妈蛋,居然有下载页面了。
然后我小心翼翼的下载下来,删除原来的文件,重新一编译,php-amqp居然通过了。立马修改php.ini,再编写测试代码,发现队列成功入队出队了!!而这时,已经一点了,距我刚开始安装软件已经四个多小时了,一上午就装软件装完了。然后下午把RabbitQueue.php写完,然后做Benchmark,又发现两个内存相关的参数老是调不对,每次跑到70万就要停半天才继续。应该是内存数据交换到硬盘造成的。最后的benchmark就降低到50万了。但是效果还是不错。
下面是测试的数据:
class: RabbitQueue
push
times: 100 num: 5000
request: 500000
faild: 0
max: 0.56668591499329 s/5000times
min: 0.019957065582275 s/5000times
takes: 17.697237014771 s
average: 0.17697237014771 s/5000times
rqs: 28252
pop
times: 100 num: 5000
request: 500000
faild: 0
max: 1.709969997406 s/5000times
min: 0.56408095359802 s/5000times
takes: 85.071734666824 s
average: 0.85071734666824 s/5000times
rqs: 5877
VMware 10
centos 6.6
memory 2GB
cpu i5*4
翻阅了N多的rabbitmq的资料,对其机制还是有基本的了解了。最后总结一下,rabbitmq可能在性能上不是很优越,但是它是一套完善的解决方案。个人觉得比redis更适合生产环境。还有redis没有通知机制,而rabbitmq是基于长连接的,是有推送的。这个就比redis的轮询高级很多了。
消息队列的总结
- 性能redis>memcached>mongodb
- rabbitmq是成熟的解决方案
- mongodb与amqp扩展的安装比较坑