php pthreads多线程的安装与使用
安装Pthreads基本上需要重新编译PHP,加上--enable-maintainer-zts参数,但是用这个文档很少;bug会很多很有很多意想不到的问题,生成环境上只能呵呵了,所以这个东西玩玩就算了,真正多线程还是用Python、C等等
一、安装
这里使用的是php-7.0.2
./configure\ --prefix=/usr/local/php7\ --with-config-file-path=/etc\ --with-config-file-scan-dir=/etc/php.d\ --enable-debug\ --enable-maintainer-zts\ --enable-pcntl\ --enable-fpm\ --enable-opcache\ --enable-embed=shared\ --enable-json=shared\ --enable-phpdbg\ --with-curl=shared\ --with-mysql=/usr/local/mysql\ --with-mysqli=/usr/local/mysql/bin/mysql_config\ --with-pdo-mysql
make&&makeinstall
安装pthreads
peclinstallpthreads
二、Thread
<?php
#1
$thread=newclassextendsThread{
publicfunctionrun(){
echo"HelloWorld{$this->getThreadId()}\n";
}
};
$thread->start()&&$thread->join();
#2
classworkerThreadextendsThread{
publicfunction__construct($i){
$this->i=$i;
}
publicfunctionrun(){
while(true){
echo$this->i."\n";
sleep(1);
}
}
}
for($i=0;$i<50;$i++){
$workers[$i]=newworkerThread($i);
$workers[$i]->start();
}
?>
三、Worker与Stackable
StackablesaretasksthatareexecutedbyWorkerthreads.Youcansynchronizewith,read,andwriteStackableobjectsbefore,afterandduringtheirexecution.
<?php
classSQLQueryextendsStackable{
publicfunction__construct($sql){
$this->sql=$sql;
}
publicfunctionrun(){
$dbh=$this->worker->getConnection();
$row=$dbh->query($this->sql);
while($member=$row->fetch(PDO::FETCH_ASSOC)){
print_r($member);
}
}
}
classExampleWorkerextendsWorker{
publicstatic$dbh;
publicfunction__construct($name){
}
publicfunctionrun(){
self::$dbh=newPDO('mysql:host=10.0.0.30;dbname=testdb','root','123456');
}
publicfunctiongetConnection(){
returnself::$dbh;
}
}
$worker=newExampleWorker("MyWorkerThread");
$sql1=newSQLQuery('select*fromtestorderbyiddesclimit1,5');
$worker->stack($sql1);
$sql2=newSQLQuery('select*fromtestorderbyiddesclimit5,5');
$worker->stack($sql2);
$worker->start();
$worker->shutdown();
?>
四、互斥锁
什么情况下会用到互斥锁?在你需要控制多个线程同一时刻只能有一个线程工作的情况下可以使用。一个简单的计数器程序,说明有无互斥锁情况下的不同
<?php
$counter=0;
$handle=fopen("/tmp/counter.txt","w");
fwrite($handle,$counter);
fclose($handle);
classCounterThreadextendsThread{
publicfunction__construct($mutex=null){
$this->mutex=$mutex;
$this->handle=fopen("/tmp/counter.txt","w+");
}
publicfunction__destruct(){
fclose($this->handle);
}
publicfunctionrun(){
if($this->mutex)
$locked=Mutex::lock($this->mutex);
$counter=intval(fgets($this->handle));
$counter++;
rewind($this->handle);
fputs($this->handle,$counter);
printf("Thread#%lusays:%s\n",$this->getThreadId(),$counter);
if($this->mutex)
Mutex::unlock($this->mutex);
}
}
//没有互斥锁
for($i=0;$i<50;$i++){
$threads[$i]=newCounterThread();
$threads[$i]->start();
}
//加入互斥锁
$mutex=Mutex::create(true);
for($i=0;$i<50;$i++){
$threads[$i]=newCounterThread($mutex);
$threads[$i]->start();
}
Mutex::unlock($mutex);
for($i=0;$i<50;$i++){
$threads[$i]->join();
}
Mutex::destroy($mutex);
?>
多线程与共享内存
在共享内存的例子中,没有使用任何锁,仍然可能正常工作,可能工作内存操作本身具备锁的功能
<?php
$tmp=tempnam(__FILE__,'PHP');
$key=ftok($tmp,'a');
$shmid=shm_attach($key);
$counter=0;
shm_put_var($shmid,1,$counter);
classCounterThreadextendsThread{
publicfunction__construct($shmid){
$this->shmid=$shmid;
}
publicfunctionrun(){
$counter=shm_get_var($this->shmid,1);
$counter++;
shm_put_var($this->shmid,1,$counter);
printf("Thread#%lusays:%s\n",$this->getThreadId(),$counter);
}
}
for($i=0;$i<100;$i++){
$threads[]=newCounterThread($shmid);
}
for($i=0;$i<100;$i++){
$threads[$i]->start();
}
for($i=0;$i<100;$i++){
$threads[$i]->join();
}
shm_remove($shmid);
shm_detach($shmid);
?>
五、线程同步
有些场景我们不希望thread->start()就开始运行程序,而是希望线程等待我们的命令。thread−>wait();测作用是thread−>start()后线程并不会立即运行,只有收到thread->notify();发出的信号后才运行
<?php
$tmp=tempnam(__FILE__,'PHP');
$key=ftok($tmp,'a');
$shmid=shm_attach($key);
$counter=0;
shm_put_var($shmid,1,$counter);
classCounterThreadextendsThread{
publicfunction__construct($shmid){
$this->shmid=$shmid;
}
publicfunctionrun(){
$this->synchronized(function($thread){
$thread->wait();
},$this);
$counter=shm_get_var($this->shmid,1);
$counter++;
shm_put_var($this->shmid,1,$counter);
printf("Thread#%lusays:%s\n",$this->getThreadId(),$counter);
}
}
for($i=0;$i<100;$i++){
$threads[]=newCounterThread($shmid);
}
for($i=0;$i<100;$i++){
$threads[$i]->start();
}
for($i=0;$i<100;$i++){
$threads[$i]->synchronized(function($thread){
$thread->notify();
},$threads[$i]);
}
for($i=0;$i<100;$i++){
$threads[$i]->join();
}
shm_remove($shmid);
shm_detach($shmid);
?>
六、线程池
一个Pool类
<?php
classUpdateextendsThread{
public$running=false;
public$row=array();
publicfunction__construct($row){
$this->row=$row;
$this->sql=null;
}
publicfunctionrun(){
if(strlen($this->row['bankno'])>100){
$bankno=safenet_decrypt($this->row['bankno']);
}else{
$error=sprintf("%s,%s\r\n",$this->row['id'],$this->row['bankno']);
file_put_contents("bankno_error.log",$error,FILE_APPEND);
}
if(strlen($bankno)>7){
$sql=sprintf("updatememberssetbankno='%s'whereid='%s';",$bankno,$this->row['id']);
$this->sql=$sql;
}
printf("%s\n",$this->sql);
}
}
classPool{
public$pool=array();
publicfunction__construct($count){
$this->count=$count;
}
publicfunctionpush($row){
if(count($this->pool)<$this->count){
$this->pool[]=newUpdate($row);
returntrue;
}else{
returnfalse;
}
}
publicfunctionstart(){
foreach($this->poolas$id=>$worker){
$this->pool[$id]->start();
}
}
publicfunctionjoin(){
foreach($this->poolas$id=>$worker){
$this->pool[$id]->join();
}
}
publicfunctionclean(){
foreach($this->poolas$id=>$worker){
if(!$worker->isRunning()){
unset($this->pool[$id]);
}
}
}
}
try{
$dbh=newPDO("mysql:host=".str_replace(':',';port=',$dbhost).";dbname=$dbname",$dbuser,$dbpw,array(
PDO::MYSQL_ATTR_INIT_COMMAND=>'SETNAMES\'UTF8\'',
PDO::MYSQL_ATTR_COMPRESS=>true
)
);
$sql="selectid,banknofrommembersorderbyiddesc";
$row=$dbh->query($sql);
$pool=newPool(5);
while($member=$row->fetch(PDO::FETCH_ASSOC))
{
while(true){
if($pool->push($member)){//压入任务到池中
break;
}else{//如果池已经满,就开始启动线程
$pool->start();
$pool->join();
$pool->clean();
}
}
}
$pool->start();
$pool->join();
$dbh=null;
}catch(Exception$e){
echo'[',date('H:i:s'),']','系统错误',$e->getMessage(),"\n";
}
?>
动态队列线程池
上面的例子是当线程池满后执行start统一启动,下面的例子是只要线程池中有空闲便立即创建新线程。
<?php
classUpdateextendsThread{
public$running=false;
public$row=array();
publicfunction__construct($row){
$this->row=$row;
$this->sql=null;
//print_r($this->row);
}
publicfunctionrun(){
if(strlen($this->row['bankno'])>100){
$bankno=safenet_decrypt($this->row['bankno']);
}else{
$error=sprintf("%s,%s\r\n",$this->row['id'],$this->row['bankno']);
file_put_contents("bankno_error.log",$error,FILE_APPEND);
}
if(strlen($bankno)>7){
$sql=sprintf("updatememberssetbankno='%s'whereid='%s';",$bankno,$this->row['id']);
$this->sql=$sql;
}
printf("%s\n",$this->sql);
}
}
try{
$dbh=newPDO("mysql:host=".str_replace(':',';port=',$dbhost).";dbname=$dbname",$dbuser,$dbpw,array(
PDO::MYSQL_ATTR_INIT_COMMAND=>'SETNAMES\'UTF8\'',
PDO::MYSQL_ATTR_COMPRESS=>true
)
);
$sql="selectid,banknofrommembersorderbyiddesclimit50";
$row=$dbh->query($sql);
$pool=array();
while($member=$row->fetch(PDO::FETCH_ASSOC))
{
$id=$member['id'];
while(true){
if(count($pool)<5){
$pool[$id]=newUpdate($member);
$pool[$id]->start();
break;
}else{
foreach($poolas$name=>$worker){
if(!$worker->isRunning()){
unset($pool[$name]);
}
}
}
}
}
$dbh=null;
}catch(Exception$e){
echo'【',date('H:i:s'),'】','【系统错误】',$e->getMessage(),"\n";
}
?>
pthreadsPool类
<?php
classWebWorkerextendsWorker{
publicfunction__construct(SafeLog$logger){
$this->logger=$logger;
}
protected$loger;
}
classWebWorkextendsStackable{
publicfunctionisComplete(){
return$this->complete;
}
publicfunctionrun(){
$this->worker
->logger
->log("%sexecutinginThread#%lu",
__CLASS__,$this->worker->getThreadId());
$this->complete=true;
}
protected$complete;
}
classSafeLogextendsStackable{
protectedfunctionlog($message,$args=[]){
$args=func_get_args();
if(($message=array_shift($args))){
echovsprintf(
"{$message}\n",$args);
}
}
}
$pool=newPool(8,\WebWorker::class,[newSafeLog()]);
$pool->submit($w=newWebWork());
$pool->submit(newWebWork());
$pool->submit(newWebWork());
$pool->submit(newWebWork());
$pool->submit(newWebWork());
$pool->submit(newWebWork());
$pool->submit(newWebWork());
$pool->submit(newWebWork());
$pool->submit(newWebWork());
$pool->submit(newWebWork());
$pool->submit(newWebWork());
$pool->submit(newWebWork());
$pool->submit(newWebWork());
$pool->submit(newWebWork());
$pool->shutdown();
$pool->collect(function($work){
return$work->isComplete();
});
var_dump($pool);
七、多线程文件安全读写
LOCK_SH取得共享锁定(读取的程序)
LOCK_EX取得独占锁定(写入的程序
LOCK_UN释放锁定(无论共享或独占)
LOCK_NB如果不希望flock()在锁定时堵塞
<?php
$fp=fopen("/tmp/lock.txt","r+");
if(flock($fp,LOCK_EX)){//进行排它型锁定
ftruncate($fp,0);//truncatefile
fwrite($fp,"Writesomethinghere\n");
fflush($fp);//flushoutputbeforereleasingthelock
flock($fp,LOCK_UN);//释放锁定
}else{
echo"Couldn'tgetthelock!";
}
fclose($fp);
$fp=fopen('/tmp/lock.txt','r+');
if(!flock($fp,LOCK_EX|LOCK_NB)){
echo'Unabletoobtainlock';
exit(-1);
}
fclose($fp);
?>
八、多线程与数据连接
pthreads与pdo同时使用是,需要注意一点,需要静态声明publicstatic$dbh;并且通过单例模式访问数据库连接。
Worker与PDO
<?php
classWorkextendsStackable{
publicfunction__construct(){
}
publicfunctionrun(){
$dbh=$this->worker->getConnection();
$sql="selectid,namefrommembersorderbyiddesclimit";
$row=$dbh->query($sql);
while($member=$row->fetch(PDO::FETCH_ASSOC)){
print_r($member);
}
}
}
classExampleWorkerextendsWorker{
publicstatic$dbh;
publicfunction__construct($name){
}
/*
*Therunmethodshouldjustpreparetheenvironmentfortheworkthatiscoming...
*/
publicfunctionrun(){
self::$dbh=newPDO('mysql:host=...;dbname=example','www','');
}
publicfunctiongetConnection(){
returnself::$dbh;
}
}
$worker=newExampleWorker("MyWorkerThread");
$work=newWork();
$worker->stack($work);
$worker->start();
$worker->shutdown();
?>
Pool与PDO
在线程池中链接数据库
#catpool.php
<?php
classExampleWorkerextendsWorker{
publicfunction__construct(Logging$logger){
$this->logger=$logger;
}
protected$logger;
}
/*thecollectableclassimplementsmachineryforPool::collect*/
classWorkextendsStackable{
publicfunction__construct($number){
$this->number=$number;
}
publicfunctionrun(){
$dbhost='db.example.com';//数据库服务器
$dbuser='example.com';//数据库用户名
$dbpw='password';//数据库密码
$dbname='example_real';
$dbh=newPDO("mysql:host=$dbhost;port=;dbname=$dbname",$dbuser,$dbpw,array(
PDO::MYSQL_ATTR_INIT_COMMAND=>'SETNAMES\'UTF\'',
PDO::MYSQL_ATTR_COMPRESS=>true,
PDO::ATTR_PERSISTENT=>true
)
);
$sql="selectOPEN_TIME,`COMMENT`fromMT_TRADESwhereLOGIN='".$this->number['name']."'andCMD=''and`COMMENT`='".$this->number['order'].":DEPOSIT'";
#echo$sql;
$row=$dbh->query($sql);
$mt_trades=$row->fetch(PDO::FETCH_ASSOC);
if($mt_trades){
$row=null;
$sql="UPDATEdb_example.accountsSETpaystatus='成功',deposit_time='".$mt_trades['OPEN_TIME']."'where`order`='".$this->number['order']."';";
$dbh->query($sql);
#printf("%s\n",$sql);
}
$dbh=null;
printf("runtime:%s,%s,%s\n",date('Y-m-dH:i:s'),$this->worker->getThreadId(),$this->number['order']);
}
}
classLoggingextendsStackable{
protectedstatic$dbh;
publicfunction__construct(){
$dbhost='db.example.com';//数据库服务器
$dbuser='example.com';//数据库用户名
$dbpw='password';//数据库密码
$dbname='example_real';//数据库名
self::$dbh=newPDO("mysql:host=$dbhost;port=;dbname=$dbname",$dbuser,$dbpw,array(
PDO::MYSQL_ATTR_INIT_COMMAND=>'SETNAMES\'UTF\'',
PDO::MYSQL_ATTR_COMPRESS=>true
)
);
}
protectedfunctionlog($message,$args=[]){
$args=func_get_args();
if(($message=array_shift($args))){
echovsprintf("{$message}\n",$args);
}
}
protectedfunctiongetConnection(){
returnself::$dbh;
}
}
$pool=newPool(,\ExampleWorker::class,[newLogging()]);
$dbhost='db.example.com';//数据库服务器
$dbuser='example.com';//数据库用户名
$dbpw='password';//数据库密码
$dbname='db_example';
$dbh=newPDO("mysql:host=$dbhost;port=;dbname=$dbname",$dbuser,$dbpw,array(
PDO::MYSQL_ATTR_INIT_COMMAND=>'SETNAMES\'UTF\'',
PDO::MYSQL_ATTR_COMPRESS=>true
)
);
$sql="select`order`,namefromaccountswheredeposit_timeisnullorderbyiddesc";
$row=$dbh->query($sql);
while($account=$row->fetch(PDO::FETCH_ASSOC))
{
$pool->submit(newWork($account));
}
$pool->shutdown();
?>
进一步改进上面程序,我们使用单例模式$this->worker->getInstance();全局仅仅做一次数据库连接,线程使用共享的数据库连接
<?php
classExampleWorkerextendsWorker{
#publicfunction__construct(Logging$logger){
#$this->logger=$logger;
#}
#protected$logger;
protectedstatic$dbh;
publicfunction__construct(){
}
publicfunctionrun(){
$dbhost='db.example.com';//数据库服务器
$dbuser='example.com';//数据库用户名
$dbpw='password';//数据库密码
$dbname='example';//数据库名
self::$dbh=newPDO("mysql:host=$dbhost;port=;dbname=$dbname",$dbuser,$dbpw,array(
PDO::MYSQL_ATTR_INIT_COMMAND=>'SETNAMES\'UTF\'',
PDO::MYSQL_ATTR_COMPRESS=>true,
PDO::ATTR_PERSISTENT=>true
)
);
}
protectedfunctiongetInstance(){
returnself::$dbh;
}
}
/*thecollectableclassimplementsmachineryforPool::collect*/
classWorkextendsStackable{
publicfunction__construct($data){
$this->data=$data;
#print_r($data);
}
publicfunctionrun(){
#$this->worker->logger->log("%sexecutinginThread#%lu",__CLASS__,$this->worker->getThreadId());
try{
$dbh=$this->worker->getInstance();
#print_r($dbh);
$id=$this->data['id'];
$mobile=safenet_decrypt($this->data['mobile']);
#printf("%d,%s\n",$id,$mobile);
if(strlen($mobile)>){
$mobile=substr($mobile,-);
}
if($mobile=='null'){
#$sql="UPDATEmembers_digestSETmobile='".$mobile."'whereid='".$id."'";
#printf("%s\n",$sql);
#$dbh->query($sql);
$mobile='';
$sql="UPDATEmembers_digestSETmobile=:mobilewhereid=:id";
}else{
$sql="UPDATEmembers_digestSETmobile=md(:mobile)whereid=:id";
}
$sth=$dbh->prepare($sql);
$sth->bindValue(':mobile',$mobile);
$sth->bindValue(':id',$id);
$sth->execute();
#echo$sth->debugDumpParams();
}
catch(PDOException$e){
$error=sprintf("%s,%s\n",$mobile,$id);
file_put_contents("mobile_error.log",$error,FILE_APPEND);
}
#$dbh=null;
printf("runtime:%s,%s,%s,%s\n",date('Y-m-dH:i:s'),$this->worker->getThreadId(),$mobile,$id);
#printf("runtime:%s,%s\n",date('Y-m-dH:i:s'),$this->number);
}
}
$pool=newPool(,\ExampleWorker::class,[]);
#foreach(range(,)as$number){
#$pool->submit(newWork($number));
#}
$dbhost='db.example.com';//数据库服务器
$dbuser='example.com';//数据库用户名
$dbpw='password';//数据库密码
$dbname='example';
$dbh=newPDO("mysql:host=$dbhost;port=;dbname=$dbname",$dbuser,$dbpw,array(
PDO::MYSQL_ATTR_INIT_COMMAND=>'SETNAMES\'UTF\'',
PDO::MYSQL_ATTR_COMPRESS=>true
)
);
#print_r($dbh);
#$sql="selectid,mobilefrommemberswhereid<:id";
#$sth=$dbh->prepare($sql);
#$sth->bindValue(':id',);
#$sth->execute();
#$result=$sth->fetchAll();
#print_r($result);
#
#$sql="UPDATEmembers_digestSETmobile=:mobilewhereid=:id";
#$sth=$dbh->prepare($sql);
#$sth->bindValue(':mobile','aa');
#$sth->bindValue(':id','');
#echo$sth->execute();
#echo$sth->queryString;
#echo$sth->debugDumpParams();
$sql="selectid,mobilefrommembersorderbyidasc";//limit";
$row=$dbh->query($sql);
while($members=$row->fetch(PDO::FETCH_ASSOC))
{
#$order=$account['order'];
#printf("%s\n",$order);
//print_r($members);
$pool->submit(newWork($members));
#unset($account['order']);
}
$pool->shutdown();
?>
多线程中操作数据库总结
总的来说pthreads仍然处在发展中,仍有一些不足的地方,我们也可以看到pthreads的git在不断改进这个项目
数据库持久链接很重要,否则每个线程都会开启一次数据库连接,然后关闭,会导致很多链接超时。
<?php
$dbh=newPDO('mysql:host=localhost;dbname=test',$user,$pass,array(
PDO::ATTR_PERSISTENT=>true
));
?>
关于phppthreads多线程的安装与使用的相关知识,就先给大家介绍到这里,后续还会持续更新。