15 #define THREAD_POOL_VERSION "v2.0.0 (2021-08-14)"
27 #include <type_traits>
40 typedef std::uint_fast32_t ui32;
41 typedef std::uint_fast64_t ui64;
53 ThreadPool(
const ui32 &_thread_count = std::thread::hardware_concurrency())
54 : thread_count(_thread_count ? _thread_count : std::thread::hardware_concurrency()), threads(new std::thread[_thread_count ? _thread_count : std::thread::hardware_concurrency()])
80 const std::scoped_lock lock(queue_mutex);
125 template <
typename T1,
typename T2,
typename F>
126 void parallelize_loop(
const T1 &first_index,
const T2 &index_after_last,
const F &loop, ui32 num_blocks = 0)
128 typedef std::common_type_t<T1, T2> T;
129 T the_first_index = (T)first_index;
130 T last_index = (T)index_after_last;
131 if (the_first_index == last_index)
133 if (last_index < the_first_index)
136 last_index = the_first_index;
137 the_first_index = temp;
141 num_blocks = thread_count;
142 ui64 total_size = (ui64)(last_index - the_first_index + 1);
143 ui64 block_size = (ui64)(total_size / num_blocks);
147 num_blocks = (ui32)total_size > 1 ? (ui32)total_size : 1;
149 std::atomic<ui32> blocks_running = 0;
150 for (ui32 t = 0; t < num_blocks; t++)
152 T start = ((T)(t * block_size) + the_first_index);
153 T end = (t == num_blocks - 1) ? last_index + 1 : ((T)((t + 1) * block_size) + the_first_index);
155 push_task([start, end, &loop, &blocks_running]
161 while (blocks_running != 0)
173 template <
typename F>
178 const std::scoped_lock lock(queue_mutex);
179 tasks.push(std::function<
void()>(task));
192 template <
typename F,
typename... A>
204 void reset(
const ui32 &_thread_count = std::thread::hardware_concurrency())
211 thread_count = _thread_count ? _thread_count : std::thread::hardware_concurrency();
212 threads.reset(
new std::thread[thread_count]);
227 template <
typename F,
typename... A,
typename = std::enable_if_t<std::is_void_v<std::invoke_result_t<std::decay_t<F>, std::decay_t<A>...>>>>
228 std::future<bool>
submit(
const F &task,
const A &...args)
230 std::shared_ptr<std::promise<bool>> task_promise(
new std::promise<bool>);
231 std::future<bool> future = task_promise->get_future();
237 task_promise->set_value(true);
243 task_promise->set_exception(std::current_exception());
263 template <
typename F,
typename... A,
typename R = std::invoke_result_t<std::decay_t<F>, std::decay_t<A>...>,
typename = std::enable_if_t<!std::is_void_v<R>>>
264 std::future<R>
submit(
const F &task,
const A &...args)
266 std::shared_ptr<std::promise<R>> task_promise(
new std::promise<R>);
267 std::future<R> future = task_promise->get_future();
268 push_task([task, args..., task_promise]
272 task_promise->set_value(task(args...));
278 task_promise->set_exception(std::current_exception());
297 if (tasks_total == 0)
302 if (get_tasks_running() == 0)
316 std::atomic<bool> paused =
false;
321 ui32 sleep_duration = 1000;
331 void create_threads()
333 for (ui32 i = 0; i < thread_count; i++)
335 threads[i] = std::thread(&ThreadPool::worker,
this);
342 void destroy_threads()
344 for (ui32 i = 0; i < thread_count; i++)
356 bool pop_task(std::function<
void()> &task)
358 const std::scoped_lock lock(queue_mutex);
363 task = std::move(tasks.front());
373 void sleep_or_yield()
376 std::this_thread::sleep_for(std::chrono::microseconds(sleep_duration));
378 std::this_thread::yield();
388 std::function<void()> task;
389 if (!paused && pop_task(task))
408 mutable std::mutex queue_mutex = {};
413 std::atomic<bool> running =
true;
418 std::queue<std::function<void()>> tasks = {};
428 std::unique_ptr<std::thread[]> threads;
433 std::atomic<ui32> tasks_total = 0;
454 : out_stream(_out_stream){};
462 template <
typename... T>
465 const std::scoped_lock lock(stream_mutex);
466 (out_stream << ... << items);
475 template <
typename... T>
478 print(items...,
'\n');
485 mutable std::mutex stream_mutex = {};
490 std::ostream &out_stream;
504 typedef std::int_fast64_t i64;
512 start_time = std::chrono::steady_clock::now();
520 elapsed_time = std::chrono::steady_clock::now() - start_time;
530 return (std::chrono::duration_cast<std::chrono::milliseconds>(elapsed_time)).count();
537 std::chrono::time_point<std::chrono::steady_clock> start_time = std::chrono::steady_clock::now();
542 std::chrono::duration<double> elapsed_time = std::chrono::duration<double>::zero();
A C++17 thread pool class. The user submits tasks to be executed into a queue. Whenever a thread beco...
Definition: ThreadPool.hpp:39
~ThreadPool()
Destruct the thread pool. Waits for all tasks to complete, then destroys all threads....
Definition: ThreadPool.hpp:62
std::atomic< bool > paused
An atomic variable indicating to the workers to pause. When set to true, the workers temporarily stop...
Definition: ThreadPool.hpp:316
std::future< R > submit(const F &task, const A &...args)
Submit a function with zero or more arguments and a return value into the task queue,...
Definition: ThreadPool.hpp:264
ui32 get_thread_count() const
Get the number of threads in the pool.
Definition: ThreadPool.hpp:109
void wait_for_tasks()
Wait for tasks to be completed. Normally, this function waits for all tasks, both those that are curr...
Definition: ThreadPool.hpp:291
std::future< bool > submit(const F &task, const A &...args)
Submit a function with zero or more arguments and no return value into the task queue,...
Definition: ThreadPool.hpp:228
ui64 get_tasks_queued() const
Get the number of tasks currently waiting in the queue to be executed by the threads.
Definition: ThreadPool.hpp:78
void push_task(const F &task)
Push a function with no arguments or return value into the task queue.
Definition: ThreadPool.hpp:174
ui32 get_tasks_total() const
Get the total number of unfinished tasks - either still in the queue, or running in a thread.
Definition: ThreadPool.hpp:99
ThreadPool(const ui32 &_thread_count=std::thread::hardware_concurrency())
Construct a new thread pool.
Definition: ThreadPool.hpp:53
void parallelize_loop(const T1 &first_index, const T2 &index_after_last, const F &loop, ui32 num_blocks=0)
Parallelize a loop by splitting it into blocks, submitting each block separately to the thread pool,...
Definition: ThreadPool.hpp:126
ui32 get_tasks_running() const
Get the number of tasks currently being executed by the threads.
Definition: ThreadPool.hpp:89
void reset(const ui32 &_thread_count=std::thread::hardware_concurrency())
Reset the number of threads in the pool. Waits for all currently running tasks to be completed,...
Definition: ThreadPool.hpp:204
void push_task(const F &task, const A &...args)
Push a function with arguments, but no return value, into the task queue.
Definition: ThreadPool.hpp:193
A helper class to synchronize printing to an output stream by different threads.
Definition: ThreadPool.hpp:446
synced_stream(std::ostream &_out_stream=std::cout)
Construct a new synced stream.
Definition: ThreadPool.hpp:453
void print(const T &...items)
Print any number of items into the output stream. Ensures that no other threads print to this stream ...
Definition: ThreadPool.hpp:463
void println(const T &...items)
Print any number of items into the output stream, followed by a newline character....
Definition: ThreadPool.hpp:476
A helper class to measure execution time for benchmarking purposes.
Definition: ThreadPool.hpp:503
void start()
Start (or restart) measuring time.
Definition: ThreadPool.hpp:510
void stop()
Stop measuring time and store the elapsed time since start().
Definition: ThreadPool.hpp:518
i64 ms() const
Get the number of milliseconds that have elapsed between start() and stop().
Definition: ThreadPool.hpp:528
UFJF-MLTK main namespace for core functionalities.
Definition: classifier/Classifier.hpp:11