jubilant-funicular
ThreadPool.cpp
1 #include "nta/ThreadPool.h"
2 
3 namespace nta {
4  namespace utils {
5  ThreadPool::ThreadPool(size_t num_threads) : m_workers(num_threads), m_kill(false),
6  m_available(num_threads) {
7  m_dispatch_thread = std::thread([this]{ dispatcher(); });
8  for (size_t id = 0; id < num_threads; id++) {
9  m_workers[id].t = std::thread([this, id]{ worker_func(id); });
10  }
11  }
13  for (size_t i = 0; i < m_workers.size(); i++) {
14  if (!m_workers[i].free) return false;
15  }
16  return m_funcs.empty();
17  }
19  m_available.wait();
20  for (size_t i = 0; i < m_workers.size(); i++) {
21  if (m_workers[i].free) {
22  m_workers[i].free = false;
23  return i;
24  }
25  }
26  // Should never get here
27  }
30  while (true) {
31  m_scheduled.wait();
32  if (m_kill) break;
33  size_t id = getAvailableWorker();
34 
35  m_empty.m.lock();
36  m_workers[id].thunk = m_funcs.front();
37  m_funcs.pop();
38  m_empty.m.unlock();
39 
40  m_workers[id].task.signal();
41  }
42  // Notify workers to die
43  for (auto& w : m_workers) {
44  w.task.signal();
45  }
46  }
47  void ThreadPool::worker_func(size_t wid) {
48  auto& me = m_workers[wid];
49  while (true) {
50  me.task.wait();
51  if (m_kill) break;
52  me.thunk();
53  me.free = true;
54 
56 
57  m_empty.m.lock();
58  m_empty.cv.notify_all();
59  m_empty.m.unlock();
60  }
61  }
62  void ThreadPool::schedule(const Thunk& thunk) {
63  m_empty.m.lock();
64  m_funcs.push(thunk);
65  m_empty.m.unlock();
67  }
69  std::lock_guard<std::mutex> lg(m_empty.m);
70  m_empty.cv.wait(m_empty.m, [this]{return allWorkFinished();});
71  }
72  ThreadPool::~ThreadPool() {
73  wait();
74 
75  m_kill = true;
76  // make sure everyone gets the kill notification
78 
79  m_dispatch_thread.join();
80  for (auto& w : m_workers) {
81  w.t.join();
82  }
83 
84  m_workers.clear();
85  }
86  }
87 }
nta::utils::ThreadPool::m_funcs
std::queue< Thunk > m_funcs
The functions to be executed.
Definition: ThreadPool.h:75
nta::utils::ThreadPool::m_kill
std::atomic< bool > m_kill
Sign to kill the pool.
Definition: ThreadPool.h:69
nta::utils::ThreadPool::m_scheduled
Semaphore m_scheduled
Record of scheduled workers.
Definition: ThreadPool.h:71
nta::utils::Semaphore::wait
void wait()
Halts until value of Semaphore is > 0.
Definition: Semaphore.cpp:7
nta::utils::ThreadPool::wait
void wait()
Waits for all scheduled functions to finish execution.
Definition: ThreadPool.cpp:68
nta::utils::ThreadPool::m_available
Semaphore m_available
Record of available workers.
Definition: ThreadPool.h:73
nta::utils::ThreadPool::m_empty
struct nta::utils::ThreadPool::@6 m_empty
Used to keep track of when the pool is empty.
nta::utils::ThreadPool::ThreadPool
ThreadPool(size_t num_threads)
Constructs a ThreadPool with specified number of threads.
Definition: ThreadPool.cpp:5
nta
Definition: Animation2D.h:6
nta::utils::Semaphore::signal
void signal()
Increments value of Semaphore.
Definition: Semaphore.cpp:12
nta::utils::ThreadPool::allWorkFinished
bool allWorkFinished()
Returns whether or not all work is finished.
Definition: ThreadPool.cpp:12
nta::utils::ThreadPool::getAvailableWorker
size_t getAvailableWorker()
Returns id of an available worker.
Definition: ThreadPool.cpp:18
nta::utils::ThreadPool::dispatcher
void dispatcher()
Function run by dispatch thread.
Definition: ThreadPool.cpp:28
nta::utils::ThreadPool::schedule
void schedule(const Thunk &thunk)
Schedules a function to be executed.
Definition: ThreadPool.cpp:62
nta::utils::ThreadPool::worker_func
void worker_func(size_t wid)
Function run by worker threads.
Definition: ThreadPool.cpp:47
nta::utils::ThreadPool::m_dispatch_thread
std::thread m_dispatch_thread
The thread for managing the pool.
Definition: ThreadPool.h:77
nta::utils::ThreadPool::m_workers
std::vector< worker > m_workers
The workers.
Definition: ThreadPool.h:67