Linux C线程池简单实现实例
LinuxC线程池
三个文件
1tpool.h
typedefstructtpool_work{ void(*routine)(void*); void*arg; structtpool_work*next; }tpool_work_t; typedefstructtpool{ /*poolcharacteristics*/ intnum_threads; intmax_queue_size; /*poolstate*/ pthread_t*tpid; tpool_work_t*queue; intfront,rear; /*剩下的任务可以做完,但不能再加新的任务*/ intqueue_closed; /*剩下的任务都不做了,直接关闭*/ intshutdown; /*poolsynchronization*/ pthread_mutex_tqueue_lock; pthread_cond_tqueue_has_task; pthread_cond_tqueue_has_space; pthread_cond_tqueue_empty; }*tpool_t; voidtpool_init(tpool_t*tpoolp,intnum_threads,intmax_queue_size); inttpool_add_work(tpool_ttpool,void(*routine)(void*),void*arg); inttpool_destroy(tpool_ttpool,intfinish);
2tpool.c
#include#include #include #include #include #include #include"tpool.h" #defineDEBUG #ifdefined(DEBUG) #definedebug(...)do{\ flockfile(stdout);\ printf("###%p.%s:",(void*)pthread_self(),__func__);\ printf(__VA_ARGS__);\ putchar('\n');\ fflush(stdout);\ funlockfile(stdout);\ }while(0) #else #definedebug(...) #endif void*tpool_thread(void*); voidtpool_init(tpool_t*tpoolp,intnum_worker_threads,intmax_queue_size) { inti; tpool_tpool; pool=(tpool_t)malloc(sizeof(structtpool)); if(pool==NULL){ perror("malloc"); exit(0); } pool->num_threads=0; pool->max_queue_size=max_queue_size+1; pool->num_threads=num_worker_threads; pool->tpid=NULL; pool->front=0; pool->rear=0; pool->queue_closed=0; pool->shutdown=0; if(pthread_mutex_init(&pool->queue_lock,NULL)==-1){ perror("pthread_mutex_init"); free(pool); exit(0); } if(pthread_cond_init(&pool->queue_has_space,NULL)==-1){ perror("pthread_mutex_init"); free(pool); exit(0); } if(pthread_cond_init(&pool->queue_has_task,NULL)==-1){ perror("pthread_mutex_init"); free(pool); exit(0); } if(pthread_cond_init(&pool->queue_empty,NULL)==-1){ perror("pthread_mutex_init"); free(pool); exit(0); } if((pool->queue=malloc(sizeof(structtpool_work)* pool->max_queue_size))==NULL){ perror("malloc"); free(pool); exit(0); } if((pool->tpid=malloc(sizeof(pthread_t)*num_worker_threads))==NULL){ perror("malloc"); free(pool); free(pool->queue); exit(0); } for(i=0;i tpid[i],NULL,tpool_thread, (void*)pool)!=0){ perror("pthread_create"); exit(0); } } *tpoolp=pool; } intempty(tpool_tpool) { returnpool->front==pool->rear; } intfull(tpool_tpool) { return((pool->rear+1)%pool->max_queue_size==pool->front); } intsize(tpool_tpool) { return(pool->rear+pool->max_queue_size- pool->front)%pool->max_queue_size; } inttpool_add_work(tpool_ttpool,void(*routine)(void*),void*arg) { tpool_work_t*temp; pthread_mutex_lock(&tpool->queue_lock); while(full(tpool)&&!tpool->shutdown&&!tpool->queue_closed){ pthread_cond_wait(&tpool->queue_has_space,&tpool->queue_lock); } if(tpool->shutdown||tpool->queue_closed){ pthread_mutex_unlock(&tpool->queue_lock); return-1; } intis_empty=empty(tpool); temp=tpool->queue+tpool->rear; temp->routine=routine; temp->arg=arg; tpool->rear=(tpool->rear+1)%tpool->max_queue_size; if(is_empty){ debug("signalhastask"); pthread_cond_broadcast(&tpool->queue_has_task); } pthread_mutex_unlock(&tpool->queue_lock); return0; } void*tpool_thread(void*arg) { tpool_tpool=(tpool_t)(arg); tpool_work_t*work; for(;;){ pthread_mutex_lock(&pool->queue_lock); while(empty(pool)&&!pool->shutdown){ debug("I'msleep"); pthread_cond_wait(&pool->queue_has_task,&pool->queue_lock); } debug("I'mawake"); if(pool->shutdown==1){ debug("exit"); pthread_mutex_unlock(&pool->queue_lock); pthread_exit(NULL); } intis_full=full(pool); work=pool->queue+pool->front; pool->front=(pool->front+1)%pool->max_queue_size; if(is_full){ pthread_cond_broadcast(&pool->queue_has_space); } if(empty(pool)){ pthread_cond_signal(&pool->queue_empty); } pthread_mutex_unlock(&pool->queue_lock); (*(work->routine))(work->arg); } } inttpool_destroy(tpool_ttpool,intfinish) { inti; pthread_mutex_lock(&tpool->queue_lock); tpool->queue_closed=1; if(finish==1){ debug("waitallworkdone"); while(!empty(tpool)){ pthread_cond_wait(&tpool->queue_empty,&tpool->queue_lock); } } tpool->shutdown=1; pthread_mutex_unlock(&tpool->queue_lock); pthread_cond_broadcast(&tpool->queue_has_task); debug("waitworkerthreadexit"); for(i=0;i num_threads;i++){ pthread_join(tpool->tpid[i],NULL); } debug("freethreadpool"); free(tpool->tpid); free(tpool->queue); free(tpool); }
3tpooltest.c
#include#include #include"tpool.h" char*str[]={"string0","string1","string2", "string3","string4","string5"}; voidjob(void*jobstr) { longi,x; for(i=0;i<100000000;i++){ x=x+i; } printf("%s\n",(char*)jobstr); } intmain(void) { inti; tpool_ttest_pool; tpool_init(&test_pool,8,20); for(i=0;i<5;i++){ tpool_add_work(test_pool,job,str[i]); } tpool_destroy(test_pool,1); return0; }
感谢阅读,希望能帮助到大家,谢谢大家对本站的支持!