【程式碼備份】C語言執行緒池傳參成功

2020-08-11 22:43:48

main.c

#include "thread_pool.h"

void *mytask(void *arg1, void *arg2)
{
	long n = (long)arg1;
	printf("第二個參數是 is %s\n", (char *)arg2);
	printf("執行緒id爲[%ld]的執行緒準備工作 %ld 秒...\n",
		pthread_self(), n);

	sleep(n);

	printf("執行緒id爲[%ld]的執行緒工作 %ld 秒結束了******************\n",
		pthread_self(), n);

	return NULL;
}

void *count_time(void *arg)
{
	int i = 0;
	while(1)
	{
		sleep(1);
		printf("當前是第%d秒\n", ++i);
	}
}

int main(void)
{
	pthread_t a;
	
	//只是用來建立一天用於計時的執行緒,跟執行緒池一點關係都沒有
	pthread_create(&a, NULL, count_time, NULL);

	thread_pool *pool = malloc(sizeof(thread_pool));
	
	// 1, 初始化執行緒池,並且一開始就有兩條執行緒
	init_pool(pool, 2);

	// 2, 任務投放
	printf("投放3個任務\n");
	add_task(pool, mytask, (void *)((rand()%10)*1L), (void *)("hhhhh"));
	add_task(pool, mytask, (void *)((rand()%10)*1L), (void *)("xixixi"));
	add_task(pool, mytask, (void *)((rand()%10)*1L), (void *)("heiheihei"));

	// 3, 檢查當前執行緒數量
	printf("當前執行緒數量爲:%d\n",
					remove_thread(pool, 0));
	sleep(9);
	
	//增加執行緒數量
	add_thread(pool, 2);

	sleep(5);

	// 6, 刪除執行緒(隨意,因爲刪除執行緒池的時候,會全部刪除執行緒)
	printf("刪除3條執行緒,當前執行緒數還剩: %d\n",
						remove_thread(pool, 3));

	// 7, 刪除執行緒池
	destroy_pool(pool);
	
	//8、釋放執行緒池資源
	free(pool);
	
	return 0;
}

thread_pool.c

#include "thread_pool.h"

void handler(void *arg)				
{

	pthread_mutex_unlock((pthread_mutex_t *)arg);	//防止死鎖,所以在這裏新增解鎖操作
}

/* 每一個執行緒池中的執行緒所執行的內容, arg就是執行緒池的地址 */
void *routine(void *arg)
{
	thread_pool *pool = (thread_pool *)arg;		//將執行緒池的地址存放進去pool	
	struct task *p;								//定義一個緩衝指針,後期任務佇列遍歷的時候使用

	while(1)
	{

		pthread_cleanup_push(handler, (void *)&pool->lock);//提前登記執行緒被取消後需要處理的事情		
		pthread_mutex_lock(&pool->lock);					//由於需要操作執行緒池中的共有資源,所以加鎖		


		while(pool->waiting_tasks == 0 && !pool->shutdown)	//判斷是否沒有需要執行的任務		
		{
			pthread_cond_wait(&pool->cond, &pool->lock);	//讓執行緒睡眠		
		}


		if(pool->waiting_tasks == 0 && pool->shutdown == true)	//判斷執行緒池是否沒有任務並且需要關閉	
		{
			pthread_mutex_unlock(&pool->lock);	//解鎖
			pthread_exit(NULL);					//退出線程					
		}

		p = pool->task_list->next;			//讓p登記需要執行的任務節點	
		
		pool->task_list->next = p->next;	//將此任務節點從鏈表中刪除	
		
		pool->waiting_tasks--;				//將等待執行的任務佇列-1								

		pthread_mutex_unlock(&pool->lock);	//解鎖						
		pthread_cleanup_pop(0);				//解除登記取消執行緒之後所做的函數									

		pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, NULL);		//忽略執行緒的取消操作
		(p->task)(p->myarg1, p->myarg2);											//函數呼叫										
		pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL);		//重新啓動執行緒取消操作

		free(p);													//釋放任務節點
	}

	pthread_exit(NULL);												
}

/* pool:執行緒池結構體的地址,threads_number:一開始初始化的執行緒數量 */
bool init_pool(thread_pool *pool, unsigned int threads_number)			
{
	
	pthread_mutex_init(&pool->lock, NULL);			//初始化互斥鎖				
	pthread_cond_init(&pool->cond, NULL);			//初始化條件變數				

	pool->shutdown = false;							//開啓執行緒池的標誌false:開	
	pool->task_list = malloc(sizeof(struct task));	//申請任務鏈表頭					
	pool->tids = malloc(sizeof(pthread_t) * MAX_ACTIVE_THREADS);//申請最大執行緒數量的ID記憶體	

	if(pool->task_list == NULL || pool->tids == NULL)		//判斷兩個申請的記憶體是否成功		
	{
		perror("allocate memory error");
		return false;
	}

	pool->task_list->next = NULL;	//初始化鏈表頭,將下一個節點指向NULL								

	pool->waiting_tasks = 0;		//將等待執行的任務數量置0								
	pool->active_threads = threads_number;	//登記當前的執行緒數量						

	int i;
	
	for(i=0; i<pool->active_threads; i++)				//建立執行緒池裏面的執行緒			
	{
		if(pthread_create(&((pool->tids)[i]), NULL,
					routine, (void *)pool) != 0)		//每一個執行緒都去跑routine這個函數的內容
		{
			perror("create threads error");
			return false;
		}
	}

	return true;
}


/* 投放任務:pool:執行緒池地址;task:任務需要執行的內容的函數指針; arg:傳入給task函數的參數 */
 bool add_task(thread_pool *pool,
			void *(*fun_name)(void *myarg1, void *myarg2), void *arg1, void *arg2) 
{
	struct task *new_task = malloc(sizeof(struct task));	//新建一個任務節點
	if(new_task == NULL)
	{
		perror("allocate memory error");
		return false;
	}
	new_task->task = fun_name;								//將任務需要做的函數存進task指針中
	new_task->myarg1 = arg1;								//將任務函數參數記錄在arg裏面
	new_task->myarg2 = arg2;								//將任務函數參數記錄在arg裏面
	
	new_task->next = NULL;								//將任務節點的下一個位置指向NULL


	pthread_mutex_lock(&pool->lock);					//上鎖
	if(pool->waiting_tasks >= MAX_WAITING_TASKS)		//判斷任務數量有沒有超標
	{
		pthread_mutex_unlock(&pool->lock);				//解鎖

		fprintf(stderr, "too many tasks.\n");			//反饋太多工了
		free(new_task);									//釋放掉剛纔登記的任務節點

		return false;									//返回新增不了任務到任務鏈表中
	}
	
	struct task *tmp = pool->task_list;					//將執行緒池中任務鏈表的頭節點登記到tmp
	while(tmp->next != NULL)							//將tmp指向最後的節點的位置
		tmp = tmp->next;

	tmp->next = new_task;								//將新建的任務節點插入到鏈表中
	pool->waiting_tasks++;								//將等待的任務數量+1

	pthread_mutex_unlock(&pool->lock);					//解鎖
	pthread_cond_signal(&pool->cond);					//喚醒正在睡眠中的執行緒

	return true;										//返回新增成功
}

/* 新增執行緒到執行緒池中 pool:執行緒池結構體地址, additional_threads:新增的執行緒的數量 */
int add_thread(thread_pool *pool, unsigned additional_threads)
{
	if(additional_threads == 0)
		return 0;

	unsigned total_threads =
		     pool->active_threads + additional_threads;	//將總數記錄在這個變數中

	int i, actual_increment = 0;
	for(i = pool->active_threads;
	    i < total_threads && i < MAX_ACTIVE_THREADS;
	    i++)
	{
		if(pthread_create(&((pool->tids)[i]),
				NULL, routine, (void *)pool) != 0)	//新建執行緒
		{
			perror("add threads error");

			if(actual_increment == 0)
				return -1;

			break;
		}
		actual_increment++; 					//記錄成功新增了多少條執行緒到執行緒池中
	}

	pool->active_threads += actual_increment;	//將最後成功新增到執行緒池的執行緒總數記錄執行緒池中
	return actual_increment;					//返回新建了多少條執行緒
}

int remove_thread(thread_pool *pool, unsigned int removing_threads)
{
	if(removing_threads == 0)
		return pool->active_threads;

	int remain_threads = pool->active_threads - removing_threads;	//將移除執行緒之後的執行緒數量登記下來
	remain_threads = remain_threads>0 ? remain_threads:1;			//如果這個數量不大於0,則把它置1

	int i;
	for(i=pool->active_threads-1; i>remain_threads-1; i--)			//從id的最後一位執行緒開始取消
	{
		errno = pthread_cancel(pool->tids[i]);
		if(errno != 0)
			break;
	}

	if(i == pool->active_threads-1)				//判斷髮送取消信號是否成功
		return -1;
	else
		pool->active_threads = i+1;					//將新的執行緒數量登記active_threads
	
	return pool->active_threads;					//返回剩下多少條執行緒線上程池中
}

bool destroy_pool(thread_pool *pool)
{

	pool->shutdown = true;					//使能執行緒池的退出開關
	pthread_cond_broadcast(&pool->cond);	//將所有的執行緒全部喚醒

	int i;
	for(i=0; i<pool->active_threads; i++)	//開始接合執行緒
	{
		errno = pthread_join(pool->tids[i], NULL);
		if(errno != 0)
		{
			printf("join tids[%d] error: %s\n",
					i, strerror(errno));
		}
		else
			printf("[%u] is joined\n", (unsigned)pool->tids[i]);
		
	}

	free(pool->task_list);		//釋放掉任務頭節點
	free(pool->tids);			//釋放掉執行緒ID記憶體

	return true;
}

thread_pool.h

#ifndef _THREAD_POOL_H_
#define _THREAD_POOL_H_

#include <stdio.h>
#include <stdbool.h>
#include <unistd.h>
#include <stdlib.h>
#include <string.h>
#include <strings.h>

#include <errno.h>
#include <pthread.h>

#define MAX_WAITING_TASKS	1000	//最大的等待任務數量
#define MAX_ACTIVE_THREADS	20		//最大的執行緒數量

struct task					//任務鏈表結構體
{
	void *(*task)(void *arg1,void *arg2);	//任務做什麼事情的函數指針
	void *myarg1;					//傳入函數的參數
	void *myarg2;					//傳入函數的參數
	
	struct task *next;			//鏈表的位置結構體指針
};


typedef struct thread_pool
{
	pthread_mutex_t lock;		//用於執行緒池同步互斥的互斥鎖
	pthread_cond_t  cond;		//用於讓執行緒池裏面的執行緒睡眠的條件變數
	struct task *task_list;		//執行緒池的執行任務鏈表

	pthread_t *tids;			//執行緒池裏面執行緒的ID登記處

	unsigned waiting_tasks;		//等待的任務數量,也就是上面任務鏈表的長度
	unsigned active_threads;	//當前已經建立的執行緒數

	bool shutdown;				//執行緒池的開關
}thread_pool;


bool
init_pool(thread_pool *pool,
          unsigned int threads_number);		//初始化執行緒池

bool
add_task(thread_pool *pool,
         void *(*fun_name)(void *myarg1, void *myarg2),
         void *arg1, void *arg2);						//往執行緒池裏面新增任務節點 
int 
add_thread(thread_pool *pool,
           unsigned int additional_threads_number);	//新增執行緒池中執行緒的數量

int 
remove_thread(thread_pool *pool,
              unsigned int removing_threads_number);	//移除執行緒

bool destroy_pool(thread_pool *pool);			//銷燬執行緒池
void *routine(void *arg);						//執行緒池裏面執行緒的執行函數

#endif

Makefile

CC = gcc
CFLAGS = -O0 -Wall -g -lpthread

test:main.c thread_pool.c
	$(CC) $^ -o $@ $(CFLAGS)

debug:main.c thread_pool.c
	$(CC) $^ -o $@ $(CFLAGS) -DDEBUG

clean:
	$(RM) .*.sw? test debug *.o

.PHONY:all clean

文章來源:微信公衆號【圖控大叔】