Update bs_thread_pool to 3.5.0

This commit is contained in:
Marek Roszko 2023-12-17 21:25:31 -05:00
parent 83f4597f05
commit 33f75fbd0a
1 changed files with 104 additions and 49 deletions

View File

@ -3,16 +3,15 @@
/**
* @file BS_thread_pool.hpp
* @author Barak Shoshany (baraksh@gmail.com) (http://baraksh.com)
* @version 3.3.0
* @date 2022-08-03
* @copyright Copyright (c) 2022 Barak Shoshany. Licensed under the MIT license. If you found this project useful, please consider starring it on GitHub! If you use this library in software of any kind, please provide a link to the GitHub repository https://github.com/bshoshany/thread-pool in the source code and documentation. If you use this library in published research, please cite it as follows: Barak Shoshany, "A C++17 Thread Pool for High-Performance Scientific Computing", doi:10.5281/zenodo.4742687, arXiv:2105.00613 (May 2021)
* @version 3.5.0
* @date 2023-05-25
* @copyright Copyright (c) 2023 Barak Shoshany. Licensed under the MIT license. If you found this project useful, please consider starring it on GitHub! If you use this library in software of any kind, please provide a link to the GitHub repository https://github.com/bshoshany/thread-pool in the source code and documentation. If you use this library in published research, please cite it as follows: Barak Shoshany, "A C++17 Thread Pool for High-Performance Scientific Computing", doi:10.5281/zenodo.4742687, arXiv:2105.00613 (May 2021)
*
* @brief BS::thread_pool: a fast, lightweight, and easy-to-use C++17 thread pool library. This header file contains the entire library, including the main BS::thread_pool class and the helper classes BS::multi_future, BS::blocks, BS:synced_stream, and BS::timer.
*/
#define BS_THREAD_POOL_VERSION "v3.3.0 (2022-08-03)"
#define BS_THREAD_POOL_VERSION "v3.5.0 (2023-05-25)"
#include <atomic> // std::atomic
#include <chrono> // std::chrono
#include <condition_variable> // std::condition_variable
#include <exception> // std::current_exception
@ -286,17 +285,18 @@ public:
[[nodiscard]] size_t get_tasks_running() const
{
const std::scoped_lock tasks_lock(tasks_mutex);
return tasks_total - tasks.size();
return tasks_running;
}
/**
* @brief Get the total number of unfinished tasks: either still in the queue, or running in a thread. Note that get_tasks_total() == get_tasks_queued() + get_tasks_running().
* @brief Get the total number of unfinished tasks: either still waiting in the queue, or running in a thread. Note that get_tasks_total() == get_tasks_queued() + get_tasks_running().
*
* @return The total number of tasks.
*/
[[nodiscard]] size_t get_tasks_total() const
{
return tasks_total;
const std::scoped_lock tasks_lock(tasks_mutex);
return tasks_running + tasks.size();
}
/**
@ -316,6 +316,7 @@ public:
*/
[[nodiscard]] bool is_paused() const
{
const std::scoped_lock tasks_lock(tasks_mutex);
return paused;
}
@ -372,9 +373,20 @@ public:
*/
void pause()
{
const std::scoped_lock tasks_lock(tasks_mutex);
paused = true;
}
/**
* @brief Purge all the tasks waiting in the queue. Tasks that are currently running will not be affected, but any tasks still waiting in the queue will be discarded, and will never be executed by the threads. Please note that there is no way to restore the purged tasks.
*/
void purge()
{
const std::scoped_lock tasks_lock(tasks_mutex);
while (!tasks.empty())
tasks.pop();
}
/**
* @brief Parallelize a loop by automatically splitting it into blocks and submitting each block separately to the queue. Does not return a multi_future, so the user must use wait_for_tasks() or some other method to ensure that the loop finishes executing, otherwise bad things will happen.
*
@ -393,9 +405,8 @@ public:
blocks blks(first_index, index_after_last, num_blocks ? num_blocks : thread_count);
if (blks.get_total_size() > 0)
{
F internal_loop = loop;
for (size_t i = 0; i < blks.get_num_blocks(); ++i)
push_task(std::forward<F>(internal_loop), blks.start(i), blks.end(i));
push_task(std::forward<F>(loop), blks.start(i), blks.end(i));
}
}
@ -425,12 +436,10 @@ public:
template <typename F, typename... A>
void push_task(F&& task, A&&... args)
{
std::function<void()> task_function = std::bind(std::forward<F>(task), std::forward<A>(args)...);
{
const std::scoped_lock tasks_lock(tasks_mutex);
tasks.push(task_function);
tasks.push(std::bind(std::forward<F>(task), std::forward<A>(args)...)); // cppcheck-suppress ignoredReturnValue
}
++tasks_total;
task_available_cv.notify_one();
}
@ -441,8 +450,10 @@ public:
*/
void reset(const concurrency_t thread_count_ = 0)
{
std::unique_lock tasks_lock(tasks_mutex);
const bool was_paused = paused;
paused = true;
tasks_lock.unlock();
wait_for_tasks();
destroy_threads();
thread_count = determine_thread_count(thread_count_);
@ -464,10 +475,9 @@ public:
template <typename F, typename... A, typename R = std::invoke_result_t<std::decay_t<F>, std::decay_t<A>...>>
[[nodiscard]] std::future<R> submit(F&& task, A&&... args)
{
std::function<R()> task_function = std::bind(std::forward<F>(task), std::forward<A>(args)...);
std::shared_ptr<std::promise<R>> task_promise = std::make_shared<std::promise<R>>();
push_task(
[task_function, task_promise]
[task_function = std::bind(std::forward<F>(task), std::forward<A>(args)...), task_promise]
{
try
{
@ -500,6 +510,7 @@ public:
*/
void unpause()
{
const std::scoped_lock tasks_lock(tasks_mutex);
paused = false;
}
@ -508,12 +519,48 @@ public:
*/
void wait_for_tasks()
{
std::unique_lock tasks_lock(tasks_mutex);
waiting = true;
std::unique_lock<std::mutex> tasks_lock(tasks_mutex);
task_done_cv.wait(tasks_lock, [this] { return (tasks_total == (paused ? tasks.size() : 0)); });
tasks_done_cv.wait(tasks_lock, [this] { return !tasks_running && (paused || tasks.empty()); });
waiting = false;
}
/**
* @brief Wait for tasks to be completed, but stop waiting after the specified duration has passed.
*
* @tparam R An arithmetic type representing the number of ticks to wait.
* @tparam P An std::ratio representing the length of each tick in seconds.
* @param duration The time duration to wait.
* @return true if all tasks finished running, false if the duration expired but some tasks are still running.
*/
template <typename R, typename P>
bool wait_for_tasks_duration(const std::chrono::duration<R, P>& duration)
{
std::unique_lock tasks_lock(tasks_mutex);
waiting = true;
const bool status = tasks_done_cv.wait_for(tasks_lock, duration, [this] { return !tasks_running && (paused || tasks.empty()); });
waiting = false;
return status;
}
/**
* @brief Wait for tasks to be completed, but stop waiting after the specified time point has been reached.
*
* @tparam C The type of the clock used to measure time.
* @tparam D An std::chrono::duration type used to indicate the time point.
* @param timeout_time The time point at which to stop waiting.
* @return true if all tasks finished running, false if the time point was reached but some tasks are still running.
*/
template <typename C, typename D>
bool wait_for_tasks_until(const std::chrono::time_point<C, D>& timeout_time)
{
std::unique_lock tasks_lock(tasks_mutex);
waiting = true;
const bool status = tasks_done_cv.wait_until(tasks_lock, timeout_time, [this] { return !tasks_running && (paused || tasks.empty()); });
waiting = false;
return status;
}
private:
// ========================
// Private member functions
@ -524,7 +571,10 @@ private:
*/
void create_threads()
{
running = true;
{
const std::scoped_lock tasks_lock(tasks_mutex);
workers_running = true;
}
for (concurrency_t i = 0; i < thread_count; ++i)
{
threads[i] = std::thread(&thread_pool::worker, this);
@ -536,7 +586,10 @@ private:
*/
void destroy_threads()
{
running = false;
{
const std::scoped_lock tasks_lock(tasks_mutex);
workers_running = false;
}
task_available_cv.notify_all();
for (concurrency_t i = 0; i < thread_count; ++i)
{
@ -550,7 +603,7 @@ private:
* @param thread_count_ The parameter passed to the constructor or reset(). If the parameter is a positive number, then the pool will be created with this number of threads. If the parameter is non-positive, or a parameter was not supplied (in which case it will have the default value of 0), then the pool will be created with the total number of hardware threads available, as obtained from std::thread::hardware_concurrency(). If the latter returns a non-positive number for some reason, then the pool will be created with just one thread.
* @return The number of threads to use for constructing the pool.
*/
[[nodiscard]] concurrency_t determine_thread_count(const concurrency_t thread_count_)
[[nodiscard]] concurrency_t determine_thread_count(const concurrency_t thread_count_) const
{
if (thread_count_ > 0)
return thread_count_;
@ -568,22 +621,24 @@ private:
*/
void worker()
{
while (running)
std::function<void()> task;
while (true)
{
std::function<void()> task;
std::unique_lock<std::mutex> tasks_lock(tasks_mutex);
task_available_cv.wait(tasks_lock, [this] { return !tasks.empty() || !running; });
if (running && !paused)
{
task = std::move(tasks.front());
tasks.pop();
tasks_lock.unlock();
task();
tasks_lock.lock();
--tasks_total;
if (waiting)
task_done_cv.notify_one();
}
std::unique_lock tasks_lock(tasks_mutex);
task_available_cv.wait(tasks_lock, [this] { return !tasks.empty() || !workers_running; });
if (!workers_running)
break;
if (paused)
continue;
task = std::move(tasks.front());
tasks.pop();
++tasks_running;
tasks_lock.unlock();
task();
tasks_lock.lock();
--tasks_running;
if (waiting && !tasks_running && (paused || tasks.empty()))
tasks_done_cv.notify_all();
}
}
@ -592,24 +647,19 @@ private:
// ============
/**
* @brief An atomic variable indicating whether the workers should pause. When set to true, the workers temporarily stop retrieving new tasks out of the queue, although any tasks already executed will keep running until they are finished. When set to false again, the workers resume retrieving tasks.
* @brief A flag indicating whether the workers should pause. When set to true, the workers temporarily stop retrieving new tasks out of the queue, although any tasks already executed will keep running until they are finished. When set to false again, the workers resume retrieving tasks.
*/
std::atomic<bool> paused = false;
bool paused = false;
/**
* @brief An atomic variable indicating to the workers to keep running. When set to false, the workers permanently stop working.
*/
std::atomic<bool> running = false;
/**
* @brief A condition variable used to notify worker() that a new task has become available.
* @brief A condition variable to notify worker() that a new task has become available.
*/
std::condition_variable task_available_cv = {};
/**
* @brief A condition variable used to notify wait_for_tasks() that a tasks is done.
* @brief A condition variable to notify wait_for_tasks() that the tasks are done.
*/
std::condition_variable task_done_cv = {};
std::condition_variable tasks_done_cv = {};
/**
* @brief A queue of tasks to be executed by the threads.
@ -617,9 +667,9 @@ private:
std::queue<std::function<void()>> tasks = {};
/**
* @brief An atomic variable to keep track of the total number of unfinished tasks - either still in the queue, or running in a thread.
* @brief A counter for the total number of currently running tasks.
*/
std::atomic<size_t> tasks_total = 0;
size_t tasks_running = 0;
/**
* @brief A mutex to synchronize access to the task queue by different threads.
@ -637,9 +687,14 @@ private:
std::unique_ptr<std::thread[]> threads = nullptr;
/**
* @brief An atomic variable indicating that wait_for_tasks() is active and expects to be notified whenever a task is done.
* @brief A flag indicating that wait_for_tasks() is active and expects to be notified whenever a task is done.
*/
std::atomic<bool> waiting = false;
bool waiting = false;
/**
* @brief A flag indicating to the workers to keep running. When set to false, the workers terminate permanently.
*/
bool workers_running = false;
};
// End class thread_pool //