C++的并发世界(十一)——线程池

news/2024/6/2 20:12:17 标签: c++, java, jvm

0.线程池的概念

在这里插入图片描述

1.线程池使用步骤

①初始化线程池:确定线程数量,并做好互斥访问;
②启动所有线程
③准备好任务处理基类;
④获取任务接口:通过条件变量阻塞等待任务

2.atomic原子操作

'std:atomic`是C++11标准库中的一个模板类,用于实现多线程环境下的原子操作。它提供了一种线程安全的方式来访问和修改共享变量,可以避免多线程环境中的数据竞争问题,'std:atomic’的使用方式类似于普通的C++变量,但是它的操作是原子性的。也就是说,在多线程环境下,多个线程同时对同一个’std:atomic变量进行操作时,不会出现数据竞争问题。

3.线程池案例

①threadpool.cpp

#include "threadpool.h"


void XThreadPool::Init(int num)
{
	std::unique_lock<std::mutex> lock(__mux__);
	thread_num = num;
	std::cout << "Thread pool Init: " << num << std::endl;
}


void XThreadPool::Start()
{
	std::unique_lock<std::mutex> lock(__mux__);
	
	if (thread_num <= 0)
	{
		std::cerr << "Please Init XThreadPool !" << std::endl;
		return;
	}

	if (!threads.empty())
	{
		std::cerr << "Thread Pool has start!" << std::endl;
		return;
	}

	for (int i = 0; i < thread_num; i++)
	{
		auto th = std::make_shared<std::thread>(&XThreadPool::Run, this);
		threads.push_back(th);
	}
}

void XThreadPool::Run()
{
	std::cout << "begin XThreadPool Run: " << std::this_thread::get_id() << std::endl;
	
	while (true)
	{
		auto task = GetTask();
		if (!task)
		{
			continue;
		}
		
		++__task_run_count__;

		try
		{
			auto re = task->Run();
			task->Setvalue(re);
		}
		catch (...)
		{
			
		}
		--__task_run_count__;
	}

	std::cout << "end XThreadPool Run: " << std::this_thread::get_id() << std::endl;
}

void XThreadPool::AddTask(XTask *task)
{
	std::unique_lock<std::mutex> lock(__mux__);
	tasks.push_back(task);
	task->is_exit = [this] {return is_exit(); }
}

XTask* XThreadPool::GetTask()
{
	std::unique_lock<std::mutex> lock(__mux__);
	if (tasks.empty())
	{
		__cv__.wait(lock);
	}

	auto task = tasks.front();
	tasks.pop_front();
	return task;
}

void XThreadPool::Stop()
{
	exit = true;
	__cv__.notify_all();
	for (auto &th : threads)
	{
		th->join();
	}

	std::unique_lock<std::mutex> lock(__mux__);
	threads.clear();
}

②threadpool.h

#pragma once

#include <thread>
#include <mutex>
#include <vector>
#include <list>
#include <iostream>
#include <string>
#include <condition_variable>
#include <functional>
#include <atomic>
#include <future>

class XTask
{
	public:
		virtual int Run() = 0;
		std::function<bool()> is_exit = nullptr;
		void Setvalue(int v) { __p__.set_value(v); }
		auto GetValue() { return __p__.get_future().get(); }
	private:
		std::promise<int> __p__;//用来接收返回值
};

class XThreadPool
{
	public:
		void Init(int num);
		void Start();//所有线程启动函数
		void Stop();//线程池退出
		void AddTask(XTask *task);
		XTask* GetTask();
		bool is_exit() { return exit; }
		int task_run_count() { return __task_run_count__; }

	private:
		int thread_num = 0;
		std::mutex __mux__;
		void Run();//线程池线程入口函数
		std::vector<std::shared_ptr<std::thread>> threads;
		std::list<XTask*> tasks;
		std::condition_variable __cv__;
		bool exit = false;
		std::atomic<int> __task_run_count__ = {0};//正在运行的任务数量
};

③main.cpp

#include "threadpool.h"

class MyTask :public XTask
{
	public:
		int Run()
		{
			std::cout <<"==============================================" << std::endl;
			std::cout << std::this_thread::get_id() << "-Mytask" << name << std::endl;
			std::cout << "==============================================" << std::endl;

			for (int i = 0; i < 10; i++)
			{
				if(is_exit())
				{
					break;
				}

				std::cout << "." << std::flush;
				std::this_thread::sleep_for(std::chrono::microseconds(500));
			}

			return 0;
		}

		std::string name = "";
};

int main()
{
	XThreadPool pool;
	pool.Init(16);
	pool.Start();

	MyTask task1;
	task1.name = "test name 001";
	pool.AddTask(&task1);

	MyTask task2;
	task2.name = "test name 002";
	pool.AddTask(&task2);
	std::this_thread::sleep_for(std::chrono::seconds(100));
	std::cout << "task run count =" << pool.task_run_count() << std::endl;

	MyTask task3;
	task3.name = "test name 003";
	pool.AddTask(&task3);

	MyTask task4;
	task4.name = "test name 004";
	pool.AddTask(&task4);
	std::cout << "task run count = " << pool.task_run_count() << std::endl;

	std::this_thread::sleep_for(std::chrono::seconds(1));
	pool.Stop();
	std::cout << "task run count =" << pool.task_run_count() << std::endl;

	getchar();
	return 0;
}

http://www.niftyadmin.cn/n/5480646.html

相关文章

C/C++ inline 函数

C/C中包含了一类inline函数&#xff0c;其只是单纯在原本函数申明或者定义前面多了一个inline 但是带来含义的确实不一样的。 如果不带inline那么主函数执行到函数入口处会跳到相应的函数代码除继续执行&#xff0c;在内存 中的代码段内存中这些代码不是连续的&#xff0c;这样…

Nifi同步过程中报错create_time字段找不到_实际目标表和源表中没有这个字段---大数据之Nifi工作笔记0066

很奇怪的问题,在使用nifi的时候碰到的,这里是用NIFI,把数据从postgresql中同步到mysql中, 首先postgresql中的源表,中是没有create_time这个字段的,但是同步的过程中报错了. 报错的内容是说,目标表中有个create_time字段,这个字段是必填的,但是传过来的flowfile文件中,的数据没…

关于moviepy出现must be real number, not NoneType

#降级装饰器版本 decorator4.4.2 不行就再降4.0.2 有的也可以通过省级mv解决更多解答https://stackoverflow.com/questions/68032884/getting-typeerror-must-be-real-number-not-nonetype-whenever-trying-to-run-wr

查询category的id存在于项目表中category_id_list的json array字段中

表category 表project 查询category的id存在于项目表中category_id_list的json array字段中。

LDO原理以及相关性能参数

LDO概述 LDO是Low Dropout Regulator的缩写&#xff0c;意思是低压差线性稳压器。 LDO的性能特点&#xff1a; 低压差是指输入电压与输出电压的差值比较低&#xff1b; 线性是指MOS基本处于线性工作状态&#xff1b; 稳压器是指在正常的VIN范围内&#xff0c;输出VOUT都稳…

dynamicreports示例

1. 简单段落文本报表 //标题样式StyleBuilder titleStyle DynamicReports.stl.style().setHorizontalTextAlignment(HorizontalTextAlignment.CENTER)//设置对齐方式.setFontSize(50)//设置字体.setBackgroundColor(Color.CYAN);//设置背景颜色//段落样式StyleBuilder paragra…

面试算法-166-排序链表

题目 给你链表的头结点 head &#xff0c;请将其按 升序 排列并返回 排序后的链表 。 示例 1&#xff1a; 输入&#xff1a;head [4,2,1,3] 输出&#xff1a;[1,2,3,4] 解 class Solution {public ListNode sortList(ListNode head) {if (head null || head.next null…

无尘净化棉签:清洁革新的里程碑

随着科技的不断进步&#xff0c;日常生活中的许多小物件也在不断地得到创新和改良。其中&#xff0c;棉签作为一种常见的清洁工具&#xff0c;经历了从传统到现代的革新&#xff0c;引入了无尘棉签的概念&#xff0c;为清洁领域带来了一场革命性的变革。本文优斯特将探讨无尘棉…