153 lines
3.7 KiB
C++
153 lines
3.7 KiB
C++
#include <cstdint>
|
|
#include <thread>
|
|
#include <vector>
|
|
#include <queue>
|
|
#include <functional>
|
|
#include <future>
|
|
#include <iostream>
|
|
#include <cassert>
|
|
|
|
class TaskPoolImpl
|
|
{
|
|
public:
|
|
TaskPoolImpl(uint32_t num_threads) :
|
|
m_stop(false)
|
|
{
|
|
for (uint32_t i = 0; i < num_threads; ++i)
|
|
m_threads.emplace_back(Worker, this);
|
|
}
|
|
|
|
~TaskPoolImpl()
|
|
{
|
|
Stop();
|
|
}
|
|
|
|
template<typename F, typename... Args>
|
|
std::future<typename std::result_of<F(Args...)>::type>
|
|
AddTask(F&& f, Args&&... args)
|
|
{
|
|
auto task = std::make_shared<std::packaged_task<typename std::result_of<F(Args...)>::type()>>(
|
|
std::bind(std::forward<F>(f), std::forward<Args>(args)...));
|
|
|
|
std::unique_lock<std::mutex> lock(m_tasks_mutex);
|
|
assert(!m_stop && "Can't add task to TaskPool after it is stopped");
|
|
m_tasks.emplace([task](){ (*task)(); });
|
|
lock.unlock();
|
|
m_tasks_cv.notify_one();
|
|
|
|
return task->get_future();
|
|
}
|
|
|
|
void
|
|
Stop()
|
|
{
|
|
std::unique_lock<std::mutex> lock(m_tasks_mutex);
|
|
m_stop = true;
|
|
m_tasks_mutex.unlock();
|
|
m_tasks_cv.notify_all();
|
|
for (auto& t : m_threads)
|
|
t.join();
|
|
}
|
|
|
|
private:
|
|
static void
|
|
Worker(TaskPoolImpl* pool)
|
|
{
|
|
while (true)
|
|
{
|
|
std::unique_lock<std::mutex> lock(pool->m_tasks_mutex);
|
|
if (pool->m_tasks.empty())
|
|
pool->m_tasks_cv.wait(lock, [pool](){ return !pool->m_tasks.empty() || pool->m_stop; });
|
|
if (pool->m_tasks.empty())
|
|
break;
|
|
|
|
std::function<void()> f = pool->m_tasks.front();
|
|
pool->m_tasks.pop();
|
|
lock.unlock();
|
|
|
|
f();
|
|
}
|
|
}
|
|
|
|
std::queue<std::function<void()>> m_tasks;
|
|
std::mutex m_tasks_mutex;
|
|
std::condition_variable m_tasks_cv;
|
|
bool m_stop;
|
|
std::vector<std::thread> m_threads;
|
|
};
|
|
|
|
class TaskPool
|
|
{
|
|
public:
|
|
// Add a new task to the thread pool and return a std::future belongs for the newly created task.
|
|
// The caller of this function have to wait on the future for this task to complete.
|
|
template<typename F, typename... Args>
|
|
static std::future<typename std::result_of<F(Args...)>::type>
|
|
AddTask(F&& f, Args&&... args)
|
|
{
|
|
return GetImplementation().AddTask(std::forward<F>(f), std::forward<Args>(args)...);
|
|
}
|
|
|
|
// Run all of the specified tasks on the thread pool and wait until all of them are finished
|
|
// before returning
|
|
template<typename... T>
|
|
static void
|
|
RunTasks(T&&... t)
|
|
{
|
|
RunTaskImpl<T...>::Run(std::forward<T>(t)...);
|
|
}
|
|
|
|
private:
|
|
static TaskPoolImpl&
|
|
GetImplementation()
|
|
{
|
|
static TaskPoolImpl g_task_pool_impl(std::thread::hardware_concurrency());
|
|
return g_task_pool_impl;
|
|
}
|
|
|
|
template<typename... T>
|
|
struct RunTaskImpl;
|
|
};
|
|
|
|
template<typename H, typename... T>
|
|
struct TaskPool::RunTaskImpl<H, T...>
|
|
{
|
|
static void
|
|
Run(H&& h, T&&... t)
|
|
{
|
|
auto f = AddTask(std::forward<H>(h));
|
|
RunTaskImpl<T...>::Run(std::forward<T>(t)...);
|
|
f.wait();
|
|
}
|
|
};
|
|
|
|
template<>
|
|
struct TaskPool::RunTaskImpl<>
|
|
{
|
|
static void
|
|
Run() {}
|
|
};
|
|
|
|
int main()
|
|
{
|
|
std::vector<std::future<uint32_t>> tasks;
|
|
for (int i = 0; i < 100000; ++i)
|
|
{
|
|
tasks.emplace_back(TaskPool::AddTask([](int i){
|
|
uint32_t s = 0;
|
|
for (int j = 0; j <= i; ++j)
|
|
s += j;
|
|
return s;
|
|
},
|
|
i));
|
|
}
|
|
|
|
for (auto& it : tasks) // Set breakpoint here
|
|
it.wait();
|
|
|
|
TaskPool::RunTasks(
|
|
[]() { return 1; },
|
|
[]() { return "aaaa"; }
|
|
);
|
|
}
|