35 #include <condition_variable> 40 #include <system_error> 56 explicit ThreadFeeder(
unsigned int start = 0,
unsigned int end = 0)
57 : _atomic_counter(start), _end(
end)
66 bool get_next(
unsigned int &next)
68 next = atomic_fetch_add_explicit(&_atomic_counter, 1u, std::memory_order_relaxed);
73 std::atomic_uint _atomic_counter;
74 const unsigned int _end;
85 void process_workloads(std::vector<IScheduler::Workload> &workloads, ThreadFeeder &feeder,
const ThreadInfo &
info)
87 unsigned int workload_index = info.thread_id;
91 workloads[workload_index](
info);
93 while(feeder.get_next(workload_index));
100 void set_thread_affinity(
int core_id)
107 #if !defined(_WIN64) && !defined(__APPLE__) && !defined(__OpenBSD__) 110 CPU_SET(core_id, &
set);
151 explicit Thread(
int core_pin = -1);
153 Thread(
const Thread &) =
delete;
154 Thread &operator=(
const Thread &) =
delete;
155 Thread(Thread &&) =
delete;
156 Thread &operator=(Thread &&) =
delete;
162 void set_workload(std::vector<IScheduler::Workload> *workloads, ThreadFeeder &feeder,
const ThreadInfo &info);
178 void worker_thread();
181 void set_linear_mode()
183 _thread_pool =
nullptr;
189 void set_fanout_mode(std::list<Thread> *thread_pool,
unsigned int wake_beg,
unsigned int wake_end)
191 _thread_pool = thread_pool;
192 _wake_beg = wake_beg;
193 _wake_end = wake_end;
197 std::thread _thread{};
199 std::vector<IScheduler::Workload> *_workloads{
nullptr };
200 ThreadFeeder *_feeder{
nullptr };
202 std::condition_variable _cv{};
203 bool _wait_for_work{
false };
204 bool _job_complete{
true };
205 std::exception_ptr _current_exception{
nullptr };
207 std::list<Thread> *_thread_pool{
nullptr };
208 unsigned int _wake_beg{ 0 };
209 unsigned int _wake_end{ 0 };
212 Thread::Thread(
int core_pin)
213 : _core_pin(core_pin)
215 _thread = std::thread(&Thread::worker_thread,
this);
221 if(_thread.joinable())
224 set_workload(
nullptr, feeder, ThreadInfo());
230 void Thread::set_workload(std::vector<IScheduler::Workload> *workloads, ThreadFeeder &feeder,
const ThreadInfo &info)
232 _workloads = workloads;
240 std::lock_guard<std::mutex> lock(_m);
241 _wait_for_work =
true;
242 _job_complete =
false;
250 std::unique_lock<std::mutex> lock(_m);
251 _cv.wait(lock, [&] {
return _job_complete; });
254 if(_current_exception)
256 std::rethrow_exception(_current_exception);
260 void Thread::worker_thread()
262 set_thread_affinity(_core_pin);
266 std::unique_lock<std::mutex> lock(_m);
267 _cv.wait(lock, [&] {
return _wait_for_work; });
268 _wait_for_work =
false;
270 _current_exception =
nullptr;
273 if(_workloads ==
nullptr || _feeder ==
nullptr)
279 if(_thread_pool !=
nullptr)
281 auto thread_it = _thread_pool->begin();
282 std::advance(thread_it, std::min(static_cast<unsigned int>(_thread_pool->size()), _wake_beg));
283 auto wake_end = std::min(_wake_end, static_cast<unsigned int>(_info.num_threads - 1));
284 for(
unsigned int t = _wake_beg;
t < wake_end; ++
t, ++thread_it)
290 #ifndef ARM_COMPUTE_EXCEPTIONS_DISABLED 294 process_workloads(*_workloads, *_feeder, _info);
296 #ifndef ARM_COMPUTE_EXCEPTIONS_DISABLED 300 _current_exception = std::current_exception();
303 _workloads =
nullptr;
304 _job_complete =
true;
311 struct CPPScheduler::Impl final
313 constexpr
static unsigned int m_default_wake_fanout = 4;
319 enum class ModeToggle
325 explicit Impl(
unsigned int thread_hint)
326 : _num_threads(thread_hint), _threads(_num_threads - 1), _mode(Mode::Linear), _wake_fanout(0
U)
329 if(mode_env_v ==
"linear")
331 _forced_mode = ModeToggle::Linear;
333 else if(mode_env_v ==
"fanout")
335 _forced_mode = ModeToggle::Fanout;
342 void set_num_threads(
unsigned int num_threads,
unsigned int thread_hint)
344 _num_threads = num_threads == 0 ? thread_hint : num_threads;
345 _threads.resize(_num_threads - 1);
346 auto_switch_mode(_num_threads);
348 void set_num_threads_with_affinity(
unsigned int num_threads,
unsigned int thread_hint, BindFunc func)
350 _num_threads = num_threads == 0 ? thread_hint : num_threads;
353 set_thread_affinity(func(0, thread_hint));
357 for(
auto i = 1U; i < _num_threads; ++i)
359 _threads.emplace_back(func(i, thread_hint));
361 auto_switch_mode(_num_threads);
363 void auto_switch_mode(
unsigned int num_threads_to_use)
366 if(_forced_mode == ModeToggle::Fanout || (_forced_mode ==
ModeToggle::None && num_threads_to_use > 8))
368 set_fanout_mode(m_default_wake_fanout, num_threads_to_use);
377 void set_linear_mode()
379 for(
auto &thread : _threads)
381 thread.set_linear_mode();
383 _mode = Mode::Linear;
386 void set_fanout_mode(
unsigned int wake_fanout,
unsigned int num_threads_to_use)
389 const auto actual_wake_fanout = std::max(2U, std::min(wake_fanout, num_threads_to_use - 1));
390 auto thread_it = _threads.begin();
391 for(
auto i = 1U; i < num_threads_to_use; ++i, ++thread_it)
393 const auto wake_begin = i * actual_wake_fanout - 1;
394 const auto wake_end = std::min((i + 1) * actual_wake_fanout - 1, num_threads_to_use - 1);
395 thread_it->set_fanout_mode(&_threads, wake_begin, wake_end);
398 while(thread_it != _threads.end())
400 thread_it->set_fanout_mode(&_threads, 0U, 0U);
403 _mode = Mode::Fanout;
404 _wake_fanout = actual_wake_fanout;
406 unsigned int num_threads()
const 410 unsigned int wake_fanout()
const 419 void run_workloads(std::vector<IScheduler::Workload> &workloads);
421 unsigned int _num_threads;
422 std::list<Thread> _threads;
424 Mode _mode{ Mode::Linear };
426 unsigned int _wake_fanout{ 0 };
456 _impl->set_num_threads_with_affinity(num_threads,
num_threads_hint(), func);
461 return _impl->num_threads();
464 #ifndef DOXYGEN_SKIP_THIS 465 void CPPScheduler::run_workloads(std::vector<IScheduler::Workload> &workloads)
472 const unsigned int num_threads_to_use = std::min(_impl->num_threads(),
static_cast<unsigned int>(workloads.size()));
473 if(num_threads_to_use < 1)
478 _impl->auto_switch_mode(num_threads_to_use);
479 int num_threads_to_start = 0;
480 switch(_impl->mode())
482 case CPPScheduler::Impl::Mode::Fanout:
484 num_threads_to_start =
static_cast<int>(_impl->wake_fanout()) - 1;
487 case CPPScheduler::Impl::Mode::Linear:
490 num_threads_to_start =
static_cast<int>(num_threads_to_use) - 1;
494 ThreadFeeder feeder(num_threads_to_use, workloads.size());
497 info.num_threads = num_threads_to_use;
499 auto thread_it = _impl->_threads.begin();
501 for(; t < num_threads_to_use - 1; ++
t, ++thread_it)
504 thread_it->set_workload(&workloads, feeder, info);
506 thread_it = _impl->_threads.begin();
507 for(
int i = 0; i < num_threads_to_start; ++i, ++thread_it)
512 process_workloads(workloads, feeder, info);
513 #ifndef ARM_COMPUTE_EXCEPTIONS_DISABLED 517 thread_it = _impl->_threads.begin();
518 for(
unsigned int i = 0; i < num_threads_to_use - 1; ++i, ++thread_it)
522 #ifndef ARM_COMPUTE_EXCEPTIONS_DISABLED 524 catch(
const std::system_error &e)
526 std::cerr <<
"Caught system_error with code " << e.code() <<
" meaning " << e.what() <<
'\n';
534 schedule_common(kernel, hints, window, tensors);
540 schedule_common(kernel, hints, kernel->
window(), tensors);
static CPPScheduler & get()
Access the scheduler singleton.
std::string getenv(const std::string &env_name)
Get environment variable as a string.
const Window & window() const
The maximum window the kernel can be executed on.
unsigned int num_threads() const override
Returns the number of threads that the SingleThreadScheduler has in its pool.
Common interface for all kernels implemented in C++.
#define ARM_COMPUTE_ERROR_ON(cond)
If the condition is true then an error message is printed and an exception thrown.
std::mutex Mutex
Wrapper of Mutex data-object.
uint32_t num_threads_hint()
Some systems have both big and small cores, this fuction computes the minimum number of cores that ar...
CPUInfo & cpu_info()
Get CPU info.
Copyright (c) 2017-2022 Arm Limited.
void set_num_threads_with_affinity(unsigned int num_threads, BindFunc func) override
Sets the number of threads the scheduler will use to run the kernels but also using a binding functio...
std::function< int(int, int)> BindFunc
Function to be used and map a given thread id to a logical core id.
std::string tolower(std::string string)
Convert string to lower case.
#define ARM_COMPUTE_LOG_INFO_MSG_WITH_FORMAT_CORE(fmt,...)
Log information level formatted message to the core system logger.
#define ARM_COMPUTE_EXIT_ON_MSG(cond, msg)
If the condition is true, the given message is printed and program exits.
void advance(CharPosition &pos, char ch)
C++11 implementation of a pool of threads to automatically split a kernel's execution among several t...
~CPPScheduler()
Default destructor.
void end(TokenStream &in, bool &valid)
void set_num_threads(unsigned int num_threads) override
Sets the number of threads the scheduler will use to run the kernels.
Basic pool of threads to execute CPP/Neon code on several cores in parallel.
ScaleKernelInfo info(interpolation_policy, default_border_mode, PixelValue(), sampling_policy, false)
Information about executing thread and CPU.
void schedule_op(ICPPKernel *kernel, const Hints &hints, const Window &window, ITensorPack &tensors) override
Runs the kernel in the same thread as the caller synchronously.
unsigned int num_threads_hint() const
Get a hint for the best possible number of execution threads.
std::lock_guard< Mutex > lock_guard
Wrapper of lock_guard data-object.
Describe a multidimensional execution window.
void schedule(ICPPKernel *kernel, const Hints &hints) override
Runs the kernel in the same thread as the caller synchronously.
CPPScheduler()
Constructor: create a pool of threads.