36 #include <condition_variable>
41 #include <system_error>
57 explicit ThreadFeeder(
unsigned int start = 0,
unsigned int end = 0) : _atomic_counter(start), _end(
end)
66 bool get_next(
unsigned int &next)
68 next = atomic_fetch_add_explicit(&_atomic_counter, 1u, std::memory_order_relaxed);
73 std::atomic_uint _atomic_counter;
74 const unsigned int _end;
85 void process_workloads(std::vector<IScheduler::Workload> &workloads, ThreadFeeder &feeder,
const ThreadInfo &
info)
87 unsigned int workload_index =
info.thread_id;
91 workloads[workload_index](
info);
92 }
while (feeder.get_next(workload_index));
99 void set_thread_affinity(
int core_id)
106 #if !defined(_WIN64) && !defined(__APPLE__) && !defined(__OpenBSD__)
109 CPU_SET(core_id, &
set);
150 explicit Thread(
int core_pin = -1);
152 Thread(
const Thread &) =
delete;
153 Thread &operator=(
const Thread &) =
delete;
154 Thread(Thread &&) =
delete;
155 Thread &operator=(Thread &&) =
delete;
161 void set_workload(std::vector<IScheduler::Workload> *workloads, ThreadFeeder &feeder,
const ThreadInfo &
info);
174 std::exception_ptr wait();
177 void worker_thread();
180 void set_linear_mode()
182 _thread_pool =
nullptr;
188 void set_fanout_mode(std::list<Thread> *thread_pool,
unsigned int wake_beg,
unsigned int wake_end)
190 _thread_pool = thread_pool;
191 _wake_beg = wake_beg;
192 _wake_end = wake_end;
196 std::thread _thread{};
198 std::vector<IScheduler::Workload> *_workloads{
nullptr};
199 ThreadFeeder *_feeder{
nullptr};
201 std::condition_variable _cv{};
202 bool _wait_for_work{
false};
203 bool _job_complete{
true};
204 std::exception_ptr _current_exception{
nullptr};
206 std::list<Thread> *_thread_pool{
nullptr};
207 unsigned int _wake_beg{0};
208 unsigned int _wake_end{0};
211 Thread::Thread(
int core_pin) : _core_pin(core_pin)
213 _thread = std::thread(&Thread::worker_thread,
this);
219 if (_thread.joinable())
222 set_workload(
nullptr, feeder, ThreadInfo());
228 void Thread::set_workload(std::vector<IScheduler::Workload> *workloads, ThreadFeeder &feeder,
const ThreadInfo &
info)
230 _workloads = workloads;
238 std::lock_guard<std::mutex> lock(_m);
239 _wait_for_work =
true;
240 _job_complete =
false;
245 std::exception_ptr Thread::wait()
248 std::unique_lock<std::mutex> lock(_m);
249 _cv.wait(lock, [&] {
return _job_complete; });
251 return _current_exception;
254 void Thread::worker_thread()
256 set_thread_affinity(_core_pin);
260 std::unique_lock<std::mutex> lock(_m);
261 _cv.wait(lock, [&] {
return _wait_for_work; });
262 _wait_for_work =
false;
264 _current_exception =
nullptr;
267 if (_workloads ==
nullptr || _feeder ==
nullptr)
273 if (_thread_pool !=
nullptr)
275 auto thread_it = _thread_pool->begin();
276 std::advance(thread_it, std::min(
static_cast<unsigned int>(_thread_pool->size()), _wake_beg));
277 auto wake_end = std::min(_wake_end,
static_cast<unsigned int>(_info.
num_threads - 1));
278 for (
unsigned int t = _wake_beg;
t < wake_end; ++
t, ++thread_it)
284 #ifndef ARM_COMPUTE_EXCEPTIONS_DISABLED
288 process_workloads(*_workloads, *_feeder, _info);
290 #ifndef ARM_COMPUTE_EXCEPTIONS_DISABLED
294 _current_exception = std::current_exception();
297 _workloads =
nullptr;
298 _job_complete =
true;
305 struct CPPScheduler::Impl final
307 constexpr
static unsigned int m_default_wake_fanout = 4;
313 enum class ModeToggle
319 explicit Impl(
unsigned int thread_hint)
320 : _num_threads(thread_hint), _threads(_num_threads - 1), _mode(Mode::Linear), _wake_fanout(0
U)
323 if (mode_env_v ==
"linear")
325 _forced_mode = ModeToggle::Linear;
327 else if (mode_env_v ==
"fanout")
329 _forced_mode = ModeToggle::Fanout;
333 _forced_mode = ModeToggle::None;
339 _threads.resize(_num_threads - 1);
340 auto_switch_mode(_num_threads);
347 set_thread_affinity(func(0, thread_hint));
351 for (
auto i = 1U; i < _num_threads; ++i)
353 _threads.emplace_back(func(i, thread_hint));
355 auto_switch_mode(_num_threads);
357 void auto_switch_mode(
unsigned int num_threads_to_use)
360 if (_forced_mode == ModeToggle::Fanout || (_forced_mode == ModeToggle::None && num_threads_to_use > 8))
362 set_fanout_mode(m_default_wake_fanout, num_threads_to_use);
364 "Set CPPScheduler to Fanout mode, with wake up fanout : %d and %d threads to use\n",
365 this->wake_fanout(), num_threads_to_use);
374 void set_linear_mode()
376 for (
auto &thread : _threads)
378 thread.set_linear_mode();
380 _mode = Mode::Linear;
383 void set_fanout_mode(
unsigned int wake_fanout,
unsigned int num_threads_to_use)
386 const auto actual_wake_fanout = std::max(2U, std::min(wake_fanout, num_threads_to_use - 1));
387 auto thread_it = _threads.begin();
388 for (
auto i = 1U; i < num_threads_to_use; ++i, ++thread_it)
390 const auto wake_begin = i * actual_wake_fanout - 1;
391 const auto wake_end = std::min((i + 1) * actual_wake_fanout - 1, num_threads_to_use - 1);
392 thread_it->set_fanout_mode(&_threads, wake_begin, wake_end);
395 while (thread_it != _threads.end())
397 thread_it->set_fanout_mode(&_threads, 0U, 0U);
400 _mode = Mode::Fanout;
401 _wake_fanout = actual_wake_fanout;
407 unsigned int wake_fanout()
const
416 void run_workloads(std::vector<IScheduler::Workload> &workloads);
418 unsigned int _num_threads;
419 std::list<Thread> _threads;
421 Mode _mode{Mode::Linear};
422 ModeToggle _forced_mode{ModeToggle::None};
423 unsigned int _wake_fanout{0};
457 return _impl->num_threads();
460 #ifndef DOXYGEN_SKIP_THIS
461 void CPPScheduler::run_workloads(std::vector<IScheduler::Workload> &workloads)
468 const unsigned int num_threads_to_use = std::min(_impl->num_threads(),
static_cast<unsigned int>(workloads.size()));
469 if (num_threads_to_use < 1)
474 _impl->auto_switch_mode(num_threads_to_use);
475 int num_threads_to_start = 0;
476 switch (_impl->mode())
478 case CPPScheduler::Impl::Mode::Fanout:
480 num_threads_to_start =
static_cast<int>(_impl->wake_fanout()) - 1;
483 case CPPScheduler::Impl::Mode::Linear:
486 num_threads_to_start =
static_cast<int>(num_threads_to_use) - 1;
490 ThreadFeeder feeder(num_threads_to_use, workloads.size());
493 info.num_threads = num_threads_to_use;
495 auto thread_it = _impl->_threads.begin();
497 for (;
t < num_threads_to_use - 1; ++
t, ++thread_it)
500 thread_it->set_workload(&workloads, feeder,
info);
502 thread_it = _impl->_threads.begin();
503 for (
int i = 0; i < num_threads_to_start; ++i, ++thread_it)
508 std::exception_ptr last_exception =
nullptr;
509 #ifndef ARM_COMPUTE_EXCEPTIONS_DISABLED
513 process_workloads(workloads, feeder,
info);
514 #ifndef ARM_COMPUTE_EXCEPTIONS_DISABLED
518 last_exception = std::current_exception();
524 thread_it = _impl->_threads.begin();
525 for (
unsigned int i = 0; i < num_threads_to_use - 1; ++i, ++thread_it)
527 std::exception_ptr current_exception = thread_it->wait();
528 if (current_exception)
530 last_exception = current_exception;
535 std::rethrow_exception(last_exception);
537 #ifndef ARM_COMPUTE_EXCEPTIONS_DISABLED
539 catch (
const std::system_error &e)
541 std::cerr <<
"Caught system_error with code " << e.code() <<
" meaning " << e.what() <<
'\n';
549 schedule_common(kernel, hints, window, tensors);
555 schedule_common(kernel, hints, kernel->
window(), tensors);