C语言实现支持动态拓展和销毁的线程池
本文实例介绍了C语言实现线程池,支持动态拓展和销毁,分享给大家供大家参考,具体内容如下
实现功能
- 1.初始化指定个数的线程
- 2.使用链表来管理任务队列
- 3.支持拓展动态线程
- 4.如果闲置线程过多,动态销毁部分线程
#include<stdio.h> #include<pthread.h> #include<stdlib.h> #include<signal.h> /*线程的任务队列由,函数和参数组成,任务由链表来进行管理*/ typedefstructthread_worker_s{ void*(*process)(void*arg);//处理函数 void*arg;//参数 structthread_worker_s*next; }thread_worker_t; #defineboolint #definetrue1 #definefalse0 /*线程池中各线程状态描述*/ #defineTHREAD_STATE_RUN0 #defineTHREAD_STATE_TASK_WAITING1 #defineTHREAD_STATE_TASK_PROCESSING2 #defineTHREAD_STATE_TASK_FINISHED3 #defineTHREAD_STATE_EXIT4 typedefstructthread_info_s{ pthread_tid; intstate; structthread_info_s*next; }thread_info_t; staticchar*thread_state_map[]={"创建","等待任务","处理中","处理完成","已退出"}; /*线程压缩的时候只有0,1,2,4状态的线程可以销毁*/ /*线程池管理器*/ #defineTHREAD_BUSY_PERCENT0.5/*线程:任务=1:2值越小,说明任务多,增加线程*/ #defineTHREAD_IDLE_PERCENT2/*线程:任务=2:1值大于1,线程多于任务,销毁部分线程*/ typedefstructthread_pool_s{ pthread_mutex_tqueue_lock;//队列互斥锁,即涉及到队列修改时需要加锁 pthread_cond_tqueue_ready;//队列条件锁,队列满足某个条件,触发等待这个条件的线程继续执行,比如说队列满了,队列空了 thread_worker_t*head;//任务队列头指针 boolis_destroy;//线程池是否已经销毁 intnum;//线程的个数 intrnum;;//正在跑的线程 intknum;;//已杀死的线程 intqueue_size;//工作队列的大小 thread_info_t*threads;//线程组id,通过pthread_join(thread_ids[0],NULL)来执行线程 pthread_tdisplay;//打印线程 pthread_tdestroy;//定期销毁线程的线程id pthread_textend; floatpercent;//线程个数于任务的比例rnum/queue_size intinit_num; pthread_cond_textend_ready;//如果要增加线程 }thread_pool_t; /*-------------------------函数声明----------------------*/ /** *1.初始化互斥变量 *2.初始化等待变量 *3.创建指定个数的线程线程 */ thread_pool_t*thread_pool_create(intnum); void*thread_excute_route(void*arg); /*调试函数*/ voiddebug(char*message,intflag){ if(flag) printf("%s\n",message); } void*display_thread(void*arg); /** *添加任务包括以下几个操作 *1.将任务添加到队列末尾 *2.通知等待进程来处理这个任务pthread_cond_singal(); */ intthread_pool_add_worker(thread_pool_t*pool,void*(*process)(void*arg),void*arg);//网线程池的队列中增加一个需要执行的函数,也就是任务 /** *销毁线程池,包括以下几个部分 *1.通知所有等待的进程pthread_cond_broadcase *2.等待所有的线程执行完 *3.销毁任务列表 *4.释放锁,释放条件 *4.销毁线程池对象 */ void*thread_pool_is_need_recovery(void*arg); void*thread_pool_is_need_extend(void*arg); voidthread_pool_destory(thread_pool_t*pool); thread_pool_t*thread_pool_create(intnum){ if(num<1){ returnNULL; } thread_pool_t*p; p=(thread_pool_t*)malloc(sizeof(structthread_pool_s)); if(p==NULL) returnNULL; p->init_num=num; /*初始化互斥变量与条件变量*/ pthread_mutex_init(&(p->queue_lock),NULL); pthread_cond_init(&(p->queue_ready),NULL); /*设置线程个数*/ p->num=num; p->rnum=num; p->knum=0; p->head=NULL; p->queue_size=0; p->is_destroy=false; inti=0; thread_info_t*tmp=NULL; for(i=0;i<num;i++){ /*创建线程*/ tmp=(structthread_info_s*)malloc(sizeof(structthread_info_s)); if(tmp==NULL){ free(p); returnNULL; }else{ tmp->next=p->threads; p->threads=tmp; } pthread_create(&(tmp->id),NULL,thread_excute_route,p); tmp->state=THREAD_STATE_RUN; } /*显示*/ pthread_create(&(p->display),NULL,display_thread,p); /*检测是否需要动态线程*/ //pthread_create(&(p->extend),NULL,thread_pool_is_need_extend,p); /*动态销毁*/ pthread_create(&(p->destroy),NULL,thread_pool_is_need_recovery,p); returnp; } intthread_pool_add_worker(thread_pool_t*pool,void*(*process)(void*arg),void*arg){ thread_pool_t*p=pool; thread_worker_t*worker=NULL,*member=NULL; worker=(thread_worker_t*)malloc(sizeof(structthread_worker_s)); intincr=0; if(worker==NULL){ return-1; } worker->process=process; worker->arg=arg; worker->next=NULL; thread_pool_is_need_extend(pool); pthread_mutex_lock(&(p->queue_lock)); member=p->head; if(member!=NULL){ while(member->next!=NULL) member=member->next; member->next=worker; }else{ p->head=worker; } p->queue_size++; pthread_mutex_unlock(&(p->queue_lock)); pthread_cond_signal(&(p->queue_ready)); return1; } voidthread_pool_wait(thread_pool_t*pool){ thread_info_t*thread; inti=0; for(i=0;i<pool->num;i++){ thread=(thread_info_t*)(pool->threads+i); thread->state=THREAD_STATE_EXIT; pthread_join(thread->id,NULL); } } voidthread_pool_destory(thread_pool_t*pool){ thread_pool_t*p=pool; thread_worker_t*member=NULL; if(p->is_destroy) return; p->is_destroy=true; pthread_cond_broadcast(&(p->queue_ready)); thread_pool_wait(pool); free(p->threads); p->threads=NULL; /*销毁任务列表*/ while(p->head){ member=p->head; p->head=member->next; free(member); } /*销毁线程列表*/ thread_info_t*tmp=NULL; while(p->threads){ tmp=p->threads; p->threads=tmp->next; free(tmp); } pthread_mutex_destroy(&(p->queue_lock)); pthread_cond_destroy(&(p->queue_ready)); return; } /*通过线程id,找到对应的线程*/ thread_info_t*get_thread_by_id(thread_pool_t*pool,pthread_tid){ thread_info_t*thread=NULL; thread_info_t*p=pool->threads; while(p!=NULL){ if(p->id==id) returnp; p=p->next; } returnNULL; } /*每个线程入口函数*/ void*thread_excute_route(void*arg){ thread_worker_t*worker=NULL; thread_info_t*thread=NULL; thread_pool_t*p=(thread_pool_t*)arg; //printf("thread%lldcreatesuccess\n",pthread_self()); while(1){ pthread_mutex_lock(&(p->queue_lock)); /*获取当前线程的id*/ pthread_tpthread_id=pthread_self(); /*设置当前状态*/ thread=get_thread_by_id(p,pthread_id); /*线程池被销毁,并且没有任务了*/ if(p->is_destroy==true&&p->queue_size==0){ pthread_mutex_unlock(&(p->queue_lock)); thread->state=THREAD_STATE_EXIT; p->knum++; p->rnum--; pthread_exit(NULL); } if(thread){ thread->state=THREAD_STATE_TASK_WAITING;/*线程正在等待任务*/ } /*线程池没有被销毁,没有任务到来就一直等待*/ while(p->queue_size==0&&!p->is_destroy){ pthread_cond_wait(&(p->queue_ready),&(p->queue_lock)); } p->queue_size--; worker=p->head; p->head=worker->next; pthread_mutex_unlock(&(p->queue_lock)); if(thread) thread->state=THREAD_STATE_TASK_PROCESSING;/*线程正在执行任务*/ (*(worker->process))(worker->arg); if(thread) thread->state=THREAD_STATE_TASK_FINISHED;/*任务执行完成*/ free(worker); worker=NULL; } } /*拓展线程*/ void*thread_pool_is_need_extend(void*arg){ thread_pool_t*p=(thread_pool_t*)arg; thread_pool_t*pool=p; /*判断是否需要增加线程,最终目的线程:任务=1:2*/ if(p->queue_size>100){ intincr=0; if(((float)p->rnum/p->queue_size)<THREAD_BUSY_PERCENT){ incr=(p->queue_size*THREAD_BUSY_PERCENT)-p->rnum;/*计算需要增加线程个数*/ inti=0; thread_info_t*tmp=NULL; thread_pool_t*p=pool; pthread_mutex_lock(&pool->queue_lock); if(p->queue_size<100){ pthread_mutex_unlock(&pool->queue_lock); return; } for(i=0;i<incr;i++){ /*创建线程*/ tmp=(structthread_info_s*)malloc(sizeof(structthread_info_s)); if(tmp==NULL){ continue; }else{ tmp->next=p->threads; p->threads=tmp; } p->num++; p->rnum++; pthread_create(&(tmp->id),NULL,thread_excute_route,p); tmp->state=THREAD_STATE_RUN; } pthread_mutex_unlock(&pool->queue_lock); } } //pthread_cond_signal(&pool->extend_ready); } pthread_cond_tsum_ready; /*恢复初始线程个数*/ void*thread_pool_is_need_recovery(void*arg){ thread_pool_t*pool=(thread_pool_t*)arg; inti=0; thread_info_t*tmp=NULL,*prev=NULL,*p1=NULL; /*如果没有任务了,当前线程大于初始化的线程个数*/ while(1){ i=0; if(pool->queue_size==0&&pool->rnum>pool->init_num){ sleep(5); /*5s秒内还是这个状态的话就,销毁部分线程*/ if(pool->queue_size==0&&pool->rnum>pool->init_num){ pthread_mutex_lock(&pool->queue_lock); tmp=pool->threads; while((pool->rnum!=pool->init_num)&&tmp){ /*找到空闲的线程*/ if(tmp->state!=THREAD_STATE_TASK_PROCESSING){ i++; if(prev) prev->next=tmp->next; else pool->threads=tmp->next; pool->rnum--;/*正在运行的线程减一*/ pool->knum++;/*销毁的线程加一*/ kill(tmp->id,SIGKILL);/*销毁线程*/ p1=tmp; tmp=tmp->next; free(p1); continue; } prev=tmp; tmp=tmp->next; } pthread_mutex_unlock(&pool->queue_lock); printf("5s内没有新任务销毁部分线程,销毁了%d个线程\n",i); } } sleep(5); } } /*打印一些信息的*/ void*display_thread(void*arg){ thread_pool_t*p=(thread_pool_t*)arg; thread_info_t*thread=NULL; inti=0; while(1){ printf("threads%d,running%d,killed%d\n",p->num,p->rnum,p->knum);/*线程总数,正在跑的,已销毁的*/ thread=p->threads; while(thread){ printf("id=%ld,state=%s\n",thread->id,thread_state_map[thread->state]); thread=thread->next; } sleep(5); } }
希望本文所述对大家学习C语言程序设计有所帮助。